mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 11:41:49 +02:00
refactor(bec_figure): BECFigure removed
This commit is contained in:
@ -57,26 +57,6 @@ def test_bec_dock_area_add_remove_dock(bec_dock_area, qtbot):
|
||||
assert d2.name() in dict(bec_dock_area.dock_area.docks)
|
||||
|
||||
|
||||
def test_add_remove_bec_figure_to_dock(bec_dock_area):
|
||||
d0 = bec_dock_area.new()
|
||||
fig = d0.new("BECFigure")
|
||||
plt = fig.plot(x_name="samx", y_name="bpm4i")
|
||||
im = fig.image("eiger")
|
||||
mm = fig.motor_map("samx", "samy")
|
||||
mw = fig.multi_waveform("waveform1d")
|
||||
|
||||
assert len(bec_dock_area.dock_area.docks) == 1
|
||||
assert len(d0.elements) == 1
|
||||
assert len(d0.element_list) == 1
|
||||
assert len(fig.widgets) == 4
|
||||
|
||||
assert fig.config.widget_class == "BECFigure"
|
||||
assert plt.config.widget_class == "BECWaveform"
|
||||
assert im.config.widget_class == "BECImageShow"
|
||||
assert mm.config.widget_class == "BECMotorMap"
|
||||
assert mw.config.widget_class == "BECMultiWaveform"
|
||||
|
||||
|
||||
def test_close_docks(bec_dock_area, qtbot):
|
||||
d0 = bec_dock_area.new(name="dock_0")
|
||||
d1 = bec_dock_area.new(name="dock_1")
|
||||
|
@ -1,275 +0,0 @@
|
||||
# pylint: disable=missing-function-docstring, missing-module-docstring, unused-import
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from bec_widgets.widgets.containers.figure import BECFigure
|
||||
from bec_widgets.widgets.containers.figure.plots.image.image import BECImageShow
|
||||
from bec_widgets.widgets.containers.figure.plots.motor_map.motor_map import BECMotorMap
|
||||
from bec_widgets.widgets.containers.figure.plots.multi_waveform.multi_waveform import (
|
||||
BECMultiWaveform,
|
||||
)
|
||||
from bec_widgets.widgets.containers.figure.plots.waveform.waveform import BECWaveform
|
||||
|
||||
from .client_mocks import mocked_client
|
||||
from .conftest import create_widget
|
||||
|
||||
|
||||
def test_bec_figure_init(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
assert bec_figure is not None
|
||||
assert bec_figure.client is not None
|
||||
assert isinstance(bec_figure, BECFigure)
|
||||
assert bec_figure.config.widget_class == "BECFigure"
|
||||
|
||||
|
||||
def test_bec_figure_init_with_config(mocked_client):
|
||||
config = {"widget_class": "BECFigure", "gui_id": "test_gui_id", "theme": "dark"}
|
||||
widget = BECFigure(client=mocked_client, config=config)
|
||||
assert widget.config.gui_id == "test_gui_id"
|
||||
assert widget.config.theme == "dark"
|
||||
|
||||
|
||||
def test_bec_figure_add_remove_plot(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
initial_count = len(bec_figure._widgets)
|
||||
|
||||
# Adding 3 widgets - 2 WaveformBase and 1 PlotBase
|
||||
w0 = bec_figure.plot(new=True)
|
||||
w1 = bec_figure.plot(new=True)
|
||||
w2 = bec_figure.add_widget(widget_type="BECPlotBase")
|
||||
|
||||
# Check if the widgets were added
|
||||
assert len(bec_figure._widgets) == initial_count + 3
|
||||
assert w0.gui_id in bec_figure._widgets
|
||||
assert w1.gui_id in bec_figure._widgets
|
||||
assert w2.gui_id in bec_figure._widgets
|
||||
assert bec_figure._widgets[w0.gui_id].config.widget_class == "BECWaveform"
|
||||
assert bec_figure._widgets[w1.gui_id].config.widget_class == "BECWaveform"
|
||||
assert bec_figure._widgets[w2.gui_id].config.widget_class == "BECPlotBase"
|
||||
|
||||
# Check accessing positions by the grid in figure
|
||||
assert bec_figure[0, 0] == w0
|
||||
assert bec_figure[1, 0] == w1
|
||||
assert bec_figure[2, 0] == w2
|
||||
|
||||
# Removing 1 widget
|
||||
bec_figure.remove(widget_id=w0.gui_id)
|
||||
assert len(bec_figure._widgets) == initial_count + 2
|
||||
assert w0.gui_id not in bec_figure._widgets
|
||||
assert w2.gui_id in bec_figure._widgets
|
||||
assert bec_figure._widgets[w1.gui_id].config.widget_class == "BECWaveform"
|
||||
|
||||
|
||||
def test_add_different_types_of_widgets(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
plt = bec_figure.plot(x_name="samx", y_name="bpm4i")
|
||||
im = bec_figure.image("eiger")
|
||||
motor_map = bec_figure.motor_map("samx", "samy")
|
||||
multi_waveform = bec_figure.multi_waveform("waveform")
|
||||
|
||||
assert plt.__class__ == BECWaveform
|
||||
assert im.__class__ == BECImageShow
|
||||
assert motor_map.__class__ == BECMotorMap
|
||||
assert multi_waveform.__class__ == BECMultiWaveform
|
||||
|
||||
|
||||
def test_access_widgets_access_errors(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
bec_figure.plot(row=0, col=0)
|
||||
|
||||
# access widget by non-existent coordinates
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
bec_figure[0, 2]
|
||||
assert "No widget at coordinates (0, 2)" in str(excinfo.value)
|
||||
|
||||
# access widget by non-existent widget_id
|
||||
with pytest.raises(KeyError) as excinfo:
|
||||
bec_figure["non_existent_widget"]
|
||||
assert "Widget with id 'non_existent_widget' not found" in str(excinfo.value)
|
||||
|
||||
# access widget by wrong type
|
||||
with pytest.raises(TypeError) as excinfo:
|
||||
bec_figure[1.2]
|
||||
assert (
|
||||
"Key must be a string (widget id) or a tuple of two integers (grid coordinates)"
|
||||
in str(excinfo.value)
|
||||
)
|
||||
|
||||
|
||||
def test_add_plot_to_occupied_position(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
bec_figure.plot(row=0, col=0)
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
bec_figure.plot(row=0, col=0, new=True)
|
||||
assert "Position at row 0 and column 0 is already occupied." in str(excinfo.value)
|
||||
|
||||
|
||||
def test_remove_plots(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot(row=0, col=0)
|
||||
w2 = bec_figure.plot(row=0, col=1)
|
||||
w3 = bec_figure.plot(row=1, col=0)
|
||||
w4 = bec_figure.plot(row=1, col=1)
|
||||
|
||||
assert bec_figure[0, 0] == w1
|
||||
assert bec_figure[0, 1] == w2
|
||||
assert bec_figure[1, 0] == w3
|
||||
assert bec_figure[1, 1] == w4
|
||||
|
||||
# remove by coordinates
|
||||
bec_figure[0, 0].remove()
|
||||
assert w1.gui_id not in bec_figure._widgets
|
||||
|
||||
# remove by widget_id
|
||||
bec_figure.remove(widget_id=w2.gui_id)
|
||||
assert w2.gui_id not in bec_figure._widgets
|
||||
|
||||
# remove by widget object
|
||||
w3.remove()
|
||||
assert w3.gui_id not in bec_figure._widgets
|
||||
|
||||
# check the remaining widget 4
|
||||
assert bec_figure[0, 0] == w4
|
||||
assert bec_figure[w4.gui_id] == w4
|
||||
assert w4.gui_id in bec_figure._widgets
|
||||
assert len(bec_figure._widgets) == 1
|
||||
|
||||
|
||||
def test_remove_plots_by_coordinates_ints(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot(row=0, col=0)
|
||||
w2 = bec_figure.plot(row=0, col=1)
|
||||
|
||||
bec_figure.remove(row=0, col=0)
|
||||
assert w1.gui_id not in bec_figure._widgets
|
||||
assert w2.gui_id in bec_figure._widgets
|
||||
assert bec_figure[0, 0] == w2
|
||||
assert len(bec_figure._widgets) == 1
|
||||
|
||||
|
||||
def test_remove_plots_by_coordinates_tuple(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot(row=0, col=0)
|
||||
w2 = bec_figure.plot(row=0, col=1)
|
||||
|
||||
bec_figure.remove(coordinates=(0, 0))
|
||||
assert w1.gui_id not in bec_figure._widgets
|
||||
assert w2.gui_id in bec_figure._widgets
|
||||
assert bec_figure[0, 0] == w2
|
||||
assert len(bec_figure._widgets) == 1
|
||||
|
||||
|
||||
def test_remove_plot_by_id_error(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
bec_figure.plot()
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
bec_figure.remove(widget_id="non_existent_widget")
|
||||
assert "Widget with ID 'non_existent_widget' does not exist." in str(excinfo.value)
|
||||
|
||||
|
||||
def test_remove_plot_by_coordinates_error(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
bec_figure.plot(row=0, col=0)
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
bec_figure.remove(0, 1)
|
||||
assert "No widget at coordinates (0, 1)" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_remove_plot_by_providing_nothing(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
bec_figure.plot(row=0, col=0)
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
bec_figure.remove()
|
||||
assert "Must provide either widget_id or coordinates for removal." in str(excinfo.value)
|
||||
|
||||
|
||||
# def test_change_theme(bec_figure): #TODO do no work at python 3.12
|
||||
# bec_figure.change_theme("dark")
|
||||
# assert bec_figure.config.theme == "dark"
|
||||
# assert bec_figure.backgroundBrush().color().name() == "#000000"
|
||||
# bec_figure.change_theme("light")
|
||||
# assert bec_figure.config.theme == "light"
|
||||
# assert bec_figure.backgroundBrush().color().name() == "#ffffff"
|
||||
# bec_figure.change_theme("dark")
|
||||
# assert bec_figure.config.theme == "dark"
|
||||
# assert bec_figure.backgroundBrush().color().name() == "#000000"
|
||||
|
||||
|
||||
def test_change_layout(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot(row=0, col=0)
|
||||
w2 = bec_figure.plot(row=0, col=1)
|
||||
w3 = bec_figure.plot(row=1, col=0)
|
||||
w4 = bec_figure.plot(row=1, col=1)
|
||||
|
||||
bec_figure.change_layout(max_columns=1)
|
||||
|
||||
assert np.shape(bec_figure.grid) == (4, 1)
|
||||
assert bec_figure[0, 0] == w1
|
||||
assert bec_figure[1, 0] == w2
|
||||
assert bec_figure[2, 0] == w3
|
||||
assert bec_figure[3, 0] == w4
|
||||
|
||||
bec_figure.change_layout(max_rows=1)
|
||||
|
||||
assert np.shape(bec_figure.grid) == (1, 4)
|
||||
assert bec_figure[0, 0] == w1
|
||||
assert bec_figure[0, 1] == w2
|
||||
assert bec_figure[0, 2] == w3
|
||||
assert bec_figure[0, 3] == w4
|
||||
|
||||
|
||||
def test_clear_all(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
bec_figure.plot(row=0, col=0)
|
||||
bec_figure.plot(row=0, col=1)
|
||||
bec_figure.plot(row=1, col=0)
|
||||
bec_figure.plot(row=1, col=1)
|
||||
|
||||
bec_figure.clear_all()
|
||||
|
||||
assert len(bec_figure._widgets) == 0
|
||||
assert np.shape(bec_figure.grid) == (0,)
|
||||
|
||||
|
||||
def test_shortcuts(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
plt = bec_figure.plot(x_name="samx", y_name="bpm4i")
|
||||
im = bec_figure.image("eiger")
|
||||
motor_map = bec_figure.motor_map("samx", "samy")
|
||||
|
||||
assert plt.config.widget_class == "BECWaveform"
|
||||
assert plt.__class__ == BECWaveform
|
||||
assert im.config.widget_class == "BECImageShow"
|
||||
assert im.__class__ == BECImageShow
|
||||
assert motor_map.config.widget_class == "BECMotorMap"
|
||||
assert motor_map.__class__ == BECMotorMap
|
||||
|
||||
|
||||
def test_plot_access_factory(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
plt_00 = bec_figure.plot(x_name="samx", y_name="bpm4i")
|
||||
plt_01 = bec_figure.plot(x_name="samx", y_name="bpm4i", row=0, col=1)
|
||||
plt_10 = bec_figure.plot(new=True)
|
||||
|
||||
assert bec_figure.widget_list[0] == plt_00
|
||||
assert bec_figure.widget_list[1] == plt_01
|
||||
assert bec_figure.widget_list[2] == plt_10
|
||||
assert bec_figure.axes(row=0, col=0) == plt_00
|
||||
assert bec_figure.axes(row=0, col=1) == plt_01
|
||||
assert bec_figure.axes(row=1, col=0) == plt_10
|
||||
|
||||
assert len(plt_00.curves) == 1
|
||||
assert len(plt_01.curves) == 1
|
||||
assert len(plt_10.curves) == 0
|
||||
|
||||
# update plt_00
|
||||
bec_figure.plot(x_name="samx", y_name="bpm3a")
|
||||
bec_figure.plot(x=[1, 2, 3], y=[1, 2, 3], row=0, col=0)
|
||||
|
||||
assert len(plt_00.curves) == 3
|
@ -1,97 +0,0 @@
|
||||
# pylint: disable=missing-function-docstring, missing-module-docstring, unused-import
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import messages
|
||||
|
||||
from bec_widgets.widgets.containers.figure import BECFigure
|
||||
|
||||
from .client_mocks import mocked_client
|
||||
from .conftest import create_widget
|
||||
|
||||
|
||||
def test_on_image_update(qtbot, mocked_client):
|
||||
bec_image_show = create_widget(qtbot, BECFigure, client=mocked_client).image("eiger")
|
||||
data = np.random.rand(100, 100)
|
||||
msg = messages.DeviceMonitor2DMessage(device="eiger", data=data, metadata={"scan_id": "12345"})
|
||||
bec_image_show.on_image_update(msg.content, msg.metadata)
|
||||
img = bec_image_show.images[0]
|
||||
assert np.array_equal(img.get_data(), data)
|
||||
|
||||
|
||||
def test_autorange_on_image_update(qtbot, mocked_client):
|
||||
bec_image_show = create_widget(qtbot, BECFigure, client=mocked_client).image("eiger")
|
||||
# Check if autorange mode "mean" works, should be default
|
||||
data = np.random.rand(100, 100)
|
||||
msg = messages.DeviceMonitor2DMessage(device="eiger", data=data, metadata={"scan_id": "12345"})
|
||||
bec_image_show.on_image_update(msg.content, msg.metadata)
|
||||
img = bec_image_show.images[0]
|
||||
assert np.array_equal(img.get_data(), data)
|
||||
vmin = max(np.mean(data) - 2 * np.std(data), 0)
|
||||
vmax = np.mean(data) + 2 * np.std(data)
|
||||
assert np.isclose(img.color_bar.getLevels(), (vmin, vmax), rtol=(1e-5, 1e-5)).all()
|
||||
# Test general update with autorange True, mode "max"
|
||||
bec_image_show.set_autorange_mode("max")
|
||||
bec_image_show.on_image_update(msg.content, msg.metadata)
|
||||
img = bec_image_show.images[0]
|
||||
vmin = np.min(data)
|
||||
vmax = np.max(data)
|
||||
assert np.array_equal(img.get_data(), data)
|
||||
assert np.isclose(img.color_bar.getLevels(), (vmin, vmax), rtol=(1e-5, 1e-5)).all()
|
||||
# Change the input data, and switch to autorange False, colormap levels should stay untouched
|
||||
data *= 100
|
||||
msg = messages.DeviceMonitor2DMessage(device="eiger", data=data, metadata={"scan_id": "12345"})
|
||||
bec_image_show.set_autorange(False)
|
||||
bec_image_show.on_image_update(msg.content, msg.metadata)
|
||||
img = bec_image_show.images[0]
|
||||
assert np.array_equal(img.get_data(), data)
|
||||
assert np.isclose(img.color_bar.getLevels(), (vmin, vmax), rtol=(1e-3, 1e-3)).all()
|
||||
# Reactivate autorange, should now scale the new data
|
||||
bec_image_show.set_autorange(True)
|
||||
bec_image_show.set_autorange_mode("mean")
|
||||
bec_image_show.on_image_update(msg.content, msg.metadata)
|
||||
img = bec_image_show.images[0]
|
||||
vmin = max(np.mean(data) - 2 * np.std(data), 0)
|
||||
vmax = np.mean(data) + 2 * np.std(data)
|
||||
assert np.isclose(img.color_bar.getLevels(), (vmin, vmax), rtol=(1e-5, 1e-5)).all()
|
||||
|
||||
|
||||
def test_on_image_update_variable_length(qtbot, mocked_client):
|
||||
"""
|
||||
Test the on_image_update slot with data arrays of varying lengths for 'device_monitor_1d' image type.
|
||||
"""
|
||||
# Create the widget and set image_type to 'device_monitor_1d'
|
||||
bec_image_show = create_widget(qtbot, BECFigure, client=mocked_client).image("waveform1d", "1d")
|
||||
|
||||
# Generate data arrays of varying lengths
|
||||
data_lengths = [10, 15, 12, 20, 5, 8, 1, 21]
|
||||
data_arrays = [np.random.rand(length) for length in data_lengths]
|
||||
|
||||
# Simulate sending messages with these data arrays
|
||||
device = "waveform1d"
|
||||
for data in data_arrays:
|
||||
msg = messages.DeviceMonitor1DMessage(
|
||||
device=device, data=data, metadata={"scan_id": "12345"}
|
||||
)
|
||||
bec_image_show.on_image_update(msg.content, msg.metadata)
|
||||
|
||||
# After processing all data, retrieve the image and its data
|
||||
img = bec_image_show.images[0]
|
||||
image_buffer = img.get_data()
|
||||
|
||||
# The image_buffer should be a 2D array with number of rows equal to number of data arrays
|
||||
# and number of columns equal to the maximum data length
|
||||
expected_num_rows = len(data_arrays)
|
||||
expected_num_cols = max(data_lengths)
|
||||
assert image_buffer.shape == (
|
||||
expected_num_rows,
|
||||
expected_num_cols,
|
||||
), f"Expected image buffer shape {(expected_num_rows, expected_num_cols)}, got {image_buffer.shape}"
|
||||
|
||||
# Check that each row in image_buffer corresponds to the padded data arrays
|
||||
for i, data in enumerate(data_arrays):
|
||||
padded_data = np.pad(
|
||||
data, (0, expected_num_cols - len(data)), mode="constant", constant_values=0
|
||||
)
|
||||
assert np.array_equal(
|
||||
image_buffer[i], padded_data
|
||||
), f"Row {i} in image buffer does not match expected padded data"
|
@ -1,282 +0,0 @@
|
||||
import numpy as np
|
||||
from bec_lib.messages import DeviceMessage
|
||||
|
||||
from bec_widgets.widgets.containers.figure import BECFigure
|
||||
from bec_widgets.widgets.containers.figure.plots.motor_map.motor_map import MotorMapConfig
|
||||
from bec_widgets.widgets.containers.figure.plots.waveform.waveform_curve import SignalData
|
||||
|
||||
from .client_mocks import mocked_client
|
||||
from .conftest import create_widget
|
||||
|
||||
|
||||
def test_motor_map_init(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
default_config = MotorMapConfig(widget_class="BECMotorMap")
|
||||
|
||||
mm = bec_figure.motor_map(config=default_config.model_dump())
|
||||
default_config.gui_id = mm.gui_id
|
||||
|
||||
assert mm.config == default_config
|
||||
|
||||
|
||||
def test_motor_map_change_motors(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
mm = bec_figure.motor_map("samx", "samy")
|
||||
|
||||
assert mm.motor_x == "samx"
|
||||
assert mm.motor_y == "samy"
|
||||
assert mm.config.signals.x == SignalData(name="samx", entry="samx", limits=[-10, 10])
|
||||
assert mm.config.signals.y == SignalData(name="samy", entry="samy", limits=[-5, 5])
|
||||
|
||||
mm.change_motors("samx", "samz")
|
||||
|
||||
assert mm.config.signals.x == SignalData(name="samx", entry="samx", limits=[-10, 10])
|
||||
assert mm.config.signals.y == SignalData(name="samz", entry="samz", limits=[-8, 8])
|
||||
|
||||
|
||||
def test_motor_map_get_limits(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
mm = bec_figure.motor_map("samx", "samy")
|
||||
expected_limits = {"samx": [-10, 10], "samy": [-5, 5]}
|
||||
|
||||
for motor_name, expected_limit in expected_limits.items():
|
||||
actual_limit = mm._get_motor_limit(motor_name)
|
||||
assert actual_limit == expected_limit
|
||||
|
||||
|
||||
def test_motor_map_get_init_position(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
mm = bec_figure.motor_map("samx", "samy")
|
||||
mm.set_precision(2)
|
||||
|
||||
motor_map_dev = mm.client.device_manager.devices
|
||||
|
||||
expected_positions = {
|
||||
("samx", "samx"): motor_map_dev["samx"].read()["samx"]["value"],
|
||||
("samy", "samy"): motor_map_dev["samy"].read()["samy"]["value"],
|
||||
}
|
||||
|
||||
for (motor_name, entry), expected_position in expected_positions.items():
|
||||
actual_position = mm._get_motor_init_position(motor_name, entry, 2)
|
||||
assert actual_position == expected_position
|
||||
|
||||
|
||||
def test_motor_movement_updates_position_and_database(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
mm = bec_figure.motor_map("samx", "samy")
|
||||
motor_map_dev = mm.client.device_manager.devices
|
||||
|
||||
init_positions = {
|
||||
"samx": [motor_map_dev["samx"].read()["samx"]["value"]],
|
||||
"samy": [motor_map_dev["samy"].read()["samy"]["value"]],
|
||||
}
|
||||
|
||||
mm.change_motors("samx", "samy")
|
||||
|
||||
assert mm.database_buffer["x"] == init_positions["samx"]
|
||||
assert mm.database_buffer["y"] == init_positions["samy"]
|
||||
|
||||
# Simulate motor movement for 'samx' only
|
||||
new_position_samx = 4.0
|
||||
msg = DeviceMessage(signals={"samx": {"value": new_position_samx}}, metadata={})
|
||||
mm.on_device_readback(msg.content, msg.metadata)
|
||||
|
||||
init_positions["samx"].append(new_position_samx)
|
||||
init_positions["samy"].append(init_positions["samy"][-1])
|
||||
# Verify database update for 'samx'
|
||||
assert mm.database_buffer["x"] == init_positions["samx"]
|
||||
|
||||
# Verify 'samy' retains its last known position
|
||||
assert mm.database_buffer["y"] == init_positions["samy"]
|
||||
|
||||
|
||||
def test_scatter_plot_rendering(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
mm = bec_figure.motor_map("samx", "samy")
|
||||
motor_map_dev = mm.client.device_manager.devices
|
||||
|
||||
init_positions = {
|
||||
"samx": [motor_map_dev["samx"].read()["samx"]["value"]],
|
||||
"samy": [motor_map_dev["samy"].read()["samy"]["value"]],
|
||||
}
|
||||
|
||||
mm.change_motors("samx", "samy")
|
||||
|
||||
# Simulate motor movement for 'samx' only
|
||||
new_position_samx = 4.0
|
||||
msg = DeviceMessage(signals={"samx": {"value": new_position_samx}}, metadata={})
|
||||
mm.on_device_readback(msg.content, msg.metadata)
|
||||
mm._update_plot()
|
||||
|
||||
# Get the scatter plot item
|
||||
scatter_plot_item = mm.plot_components["scatter"]
|
||||
|
||||
# Check the scatter plot item properties
|
||||
assert len(scatter_plot_item.data) > 0, "Scatter plot data is empty"
|
||||
x_data = scatter_plot_item.data["x"]
|
||||
y_data = scatter_plot_item.data["y"]
|
||||
assert x_data[-1] == new_position_samx, "Scatter plot X data not updated correctly"
|
||||
assert (
|
||||
y_data[-1] == init_positions["samy"][-1]
|
||||
), "Scatter plot Y data should retain last known position"
|
||||
|
||||
|
||||
def test_plot_visualization_consistency(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
mm = bec_figure.motor_map("samx", "samy")
|
||||
mm.change_motors("samx", "samy")
|
||||
# Simulate updating the plot with new data
|
||||
msg = DeviceMessage(signals={"samx": {"value": 5}}, metadata={})
|
||||
mm.on_device_readback(msg.content, msg.metadata)
|
||||
msg = DeviceMessage(signals={"samy": {"value": 9}}, metadata={})
|
||||
mm.on_device_readback(msg.content, msg.metadata)
|
||||
mm._update_plot()
|
||||
|
||||
scatter_plot_item = mm.plot_components["scatter"]
|
||||
|
||||
# Check if the scatter plot reflects the new data correctly
|
||||
assert (
|
||||
scatter_plot_item.data["x"][-1] == 5 and scatter_plot_item.data["y"][-1] == 9
|
||||
), "Plot not updated correctly with new data"
|
||||
|
||||
|
||||
def test_change_background_value(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
mm = bec_figure.motor_map("samx", "samy")
|
||||
|
||||
assert mm.config.background_value == 25
|
||||
assert np.all(mm.plot_components["limit_map"].image == 25.0)
|
||||
|
||||
mm.set_background_value(50)
|
||||
qtbot.wait(200)
|
||||
|
||||
assert mm.config.background_value == 50
|
||||
assert np.all(mm.plot_components["limit_map"].image == 50.0)
|
||||
|
||||
|
||||
def test_motor_map_init_from_config(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
config = {
|
||||
"widget_class": "BECMotorMap",
|
||||
"gui_id": "mm_id",
|
||||
"parent_id": bec_figure.gui_id,
|
||||
"row": 0,
|
||||
"col": 0,
|
||||
"axis": {
|
||||
"title": "Motor position: (-0.0, 0.0)",
|
||||
"title_size": None,
|
||||
"x_label": "Motor X (samx)",
|
||||
"x_label_size": None,
|
||||
"y_label": "Motor Y (samy)",
|
||||
"y_label_size": None,
|
||||
"legend_label_size": None,
|
||||
"x_scale": "linear",
|
||||
"y_scale": "linear",
|
||||
"x_lim": None,
|
||||
"y_lim": None,
|
||||
"x_grid": True,
|
||||
"y_grid": True,
|
||||
"outer_axes": False,
|
||||
},
|
||||
"signals": {
|
||||
"source": "device_readback",
|
||||
"x": {
|
||||
"name": "samx",
|
||||
"entry": "samx",
|
||||
"unit": None,
|
||||
"modifier": None,
|
||||
"limits": [-10.0, 10.0],
|
||||
},
|
||||
"y": {
|
||||
"name": "samy",
|
||||
"entry": "samy",
|
||||
"unit": None,
|
||||
"modifier": None,
|
||||
"limits": [-5.0, 5.0],
|
||||
},
|
||||
"z": None,
|
||||
"dap": None,
|
||||
},
|
||||
"color": (255, 255, 255, 255),
|
||||
"scatter_size": 5,
|
||||
"max_points": 50,
|
||||
"num_dim_points": 10,
|
||||
"precision": 5,
|
||||
"background_value": 50,
|
||||
}
|
||||
mm = bec_figure.motor_map(config=config)
|
||||
config["gui_id"] = mm.gui_id
|
||||
|
||||
assert mm._config_dict == config
|
||||
|
||||
|
||||
def test_motor_map_set_scatter_size(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
mm = bec_figure.motor_map("samx", "samy")
|
||||
|
||||
assert mm.config.scatter_size == 5
|
||||
assert mm.plot_components["scatter"].opts["size"] == 5
|
||||
|
||||
mm.set_scatter_size(10)
|
||||
qtbot.wait(200)
|
||||
|
||||
assert mm.config.scatter_size == 10
|
||||
assert mm.plot_components["scatter"].opts["size"] == 10
|
||||
|
||||
|
||||
def test_motor_map_change_precision(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
mm = bec_figure.motor_map("samx", "samy")
|
||||
|
||||
assert mm.config.precision == 2
|
||||
mm.set_precision(10)
|
||||
assert mm.config.precision == 10
|
||||
|
||||
|
||||
def test_motor_map_set_color(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
mm = bec_figure.motor_map("samx", "samy")
|
||||
|
||||
assert mm.config.color == (255, 255, 255, 255)
|
||||
|
||||
mm.set_color((0, 0, 0, 255))
|
||||
qtbot.wait(200)
|
||||
assert mm.config.color == (0, 0, 0, 255)
|
||||
|
||||
|
||||
def test_motor_map_get_data_max_points(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
mm = bec_figure.motor_map("samx", "samy")
|
||||
motor_map_dev = mm.client.device_manager.devices
|
||||
|
||||
init_positions = {
|
||||
"samx": [motor_map_dev["samx"].read()["samx"]["value"]],
|
||||
"samy": [motor_map_dev["samy"].read()["samy"]["value"]],
|
||||
}
|
||||
msg = DeviceMessage(signals={"samx": {"value": 5.0}}, metadata={})
|
||||
mm.on_device_readback(msg.content, msg.metadata)
|
||||
msg = DeviceMessage(signals={"samy": {"value": 9.0}}, metadata={})
|
||||
mm.on_device_readback(msg.content, msg.metadata)
|
||||
msg = DeviceMessage(signals={"samx": {"value": 6.0}}, metadata={})
|
||||
mm.on_device_readback(msg.content, msg.metadata)
|
||||
msg = DeviceMessage(signals={"samy": {"value": 7.0}}, metadata={})
|
||||
mm.on_device_readback(msg.content, msg.metadata)
|
||||
|
||||
expected_x = [init_positions["samx"][-1], 5.0, 5.0, 6.0, 6.0]
|
||||
expected_y = [init_positions["samy"][-1], init_positions["samy"][-1], 9.0, 9.0, 7.0]
|
||||
get_data = mm.get_data()
|
||||
|
||||
assert mm.database_buffer["x"] == expected_x
|
||||
assert mm.database_buffer["y"] == expected_y
|
||||
assert get_data["x"] == expected_x
|
||||
assert get_data["y"] == expected_y
|
||||
|
||||
mm.set_max_points(3)
|
||||
qtbot.wait(200)
|
||||
get_data = mm.get_data()
|
||||
assert len(get_data["x"]) == 3
|
||||
assert len(get_data["y"]) == 3
|
||||
assert get_data["x"] == expected_x[-3:]
|
||||
assert get_data["y"] == expected_y[-3:]
|
||||
assert mm.database_buffer["x"] == expected_x[-3:]
|
||||
assert mm.database_buffer["y"] == expected_y[-3:]
|
@ -1,253 +0,0 @@
|
||||
from unittest import mock
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from bec_lib.endpoints import messages
|
||||
|
||||
from bec_widgets.utils import Colors
|
||||
from bec_widgets.widgets.containers.figure import BECFigure
|
||||
|
||||
from .client_mocks import mocked_client
|
||||
from .conftest import create_widget
|
||||
|
||||
|
||||
def test_set_monitor(qtbot, mocked_client):
|
||||
"""Test that setting the monitor connects the appropriate slot."""
|
||||
# Create a BECFigure
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
# Add a multi_waveform plot
|
||||
multi_waveform = bec_figure.multi_waveform()
|
||||
multi_waveform.set_monitor("waveform1d")
|
||||
|
||||
assert multi_waveform.config.monitor == "waveform1d"
|
||||
assert multi_waveform.connected is True
|
||||
|
||||
data_0 = np.random.rand(100)
|
||||
msg = messages.DeviceMonitor1DMessage(
|
||||
device="waveform1d", data=data_0, metadata={"scan_id": "12345"}
|
||||
)
|
||||
multi_waveform.on_monitor_1d_update(msg.content, msg.metadata)
|
||||
data_waveform = multi_waveform.get_all_data()
|
||||
print(data_waveform)
|
||||
|
||||
assert len(data_waveform) == 1
|
||||
assert np.array_equal(data_waveform["curve_0"]["y"], data_0)
|
||||
|
||||
data_1 = np.random.rand(100)
|
||||
msg = messages.DeviceMonitor1DMessage(
|
||||
device="waveform1d", data=data_1, metadata={"scan_id": "12345"}
|
||||
)
|
||||
multi_waveform.on_monitor_1d_update(msg.content, msg.metadata)
|
||||
|
||||
data_waveform = multi_waveform.get_all_data()
|
||||
assert len(data_waveform) == 2
|
||||
assert np.array_equal(data_waveform["curve_0"]["y"], data_0)
|
||||
assert np.array_equal(data_waveform["curve_1"]["y"], data_1)
|
||||
|
||||
|
||||
def test_on_monitor_1d_update(qtbot, mocked_client):
|
||||
"""Test that data updates add curves to the plot."""
|
||||
# Create a BECFigure
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
# Add a multi_waveform plot
|
||||
multi_waveform = bec_figure.multi_waveform()
|
||||
multi_waveform.set_monitor("test_monitor")
|
||||
|
||||
# Simulate receiving data updates
|
||||
test_data = np.array([1, 2, 3, 4, 5])
|
||||
msg = {"data": test_data}
|
||||
metadata = {"scan_id": "scan_1"}
|
||||
|
||||
# Call the on_monitor_1d_update method
|
||||
multi_waveform.on_monitor_1d_update(msg, metadata)
|
||||
|
||||
# Check that a curve has been added
|
||||
assert len(multi_waveform.curves) == 1
|
||||
# Check that the data in the curve is correct
|
||||
curve = multi_waveform.curves[-1]
|
||||
x_data, y_data = curve.getData()
|
||||
assert np.array_equal(y_data, test_data)
|
||||
|
||||
# Simulate another data update
|
||||
test_data_2 = np.array([6, 7, 8, 9, 10])
|
||||
msg2 = {"data": test_data_2}
|
||||
metadata2 = {"scan_id": "scan_1"}
|
||||
|
||||
multi_waveform.on_monitor_1d_update(msg2, metadata2)
|
||||
|
||||
# Check that another curve has been added
|
||||
assert len(multi_waveform.curves) == 2
|
||||
# Check that the data in the curve is correct
|
||||
curve2 = multi_waveform.curves[-1]
|
||||
x_data2, y_data2 = curve2.getData()
|
||||
assert np.array_equal(y_data2, test_data_2)
|
||||
|
||||
|
||||
def test_set_curve_limit_no_flush(qtbot, mocked_client):
|
||||
"""Test set_curve_limit with flush_buffer=False."""
|
||||
# Create a BECFigure
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
# Add a multi_waveform plot
|
||||
multi_waveform = bec_figure.multi_waveform()
|
||||
multi_waveform.set_monitor("test_monitor")
|
||||
|
||||
# Simulate adding multiple curves
|
||||
for i in range(5):
|
||||
test_data = np.array([i, i + 1, i + 2])
|
||||
msg = {"data": test_data}
|
||||
metadata = {"scan_id": "scan_1"}
|
||||
multi_waveform.on_monitor_1d_update(msg, metadata)
|
||||
|
||||
# Check that there are 5 curves
|
||||
assert len(multi_waveform.curves) == 5
|
||||
# Set curve limit to 3 with flush_buffer=False
|
||||
multi_waveform.set_curve_limit(3, flush_buffer=False)
|
||||
|
||||
# Check that curves are hidden, but not removed
|
||||
assert len(multi_waveform.curves) == 5
|
||||
visible_curves = [curve for curve in multi_waveform.curves if curve.isVisible()]
|
||||
assert len(visible_curves) == 3
|
||||
# The first two curves should be hidden
|
||||
assert not multi_waveform.curves[0].isVisible()
|
||||
assert not multi_waveform.curves[1].isVisible()
|
||||
assert multi_waveform.curves[2].isVisible()
|
||||
assert multi_waveform.curves[3].isVisible()
|
||||
assert multi_waveform.curves[4].isVisible()
|
||||
|
||||
|
||||
def test_set_curve_limit_flush(qtbot, mocked_client):
|
||||
"""Test set_curve_limit with flush_buffer=True."""
|
||||
# Create a BECFigure
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
# Add a multi_waveform plot
|
||||
multi_waveform = bec_figure.multi_waveform()
|
||||
multi_waveform.set_monitor("test_monitor")
|
||||
|
||||
# Simulate adding multiple curves
|
||||
for i in range(5):
|
||||
test_data = np.array([i, i + 1, i + 2])
|
||||
msg = {"data": test_data}
|
||||
metadata = {"scan_id": "scan_1"}
|
||||
multi_waveform.on_monitor_1d_update(msg, metadata)
|
||||
|
||||
# Check that there are 5 curves
|
||||
assert len(multi_waveform.curves) == 5
|
||||
# Set curve limit to 3 with flush_buffer=True
|
||||
multi_waveform.set_curve_limit(3, flush_buffer=True)
|
||||
|
||||
# Check that only 3 curves remain
|
||||
assert len(multi_waveform.curves) == 3
|
||||
# The curves should be the last 3 added
|
||||
x_data, y_data = multi_waveform.curves[0].getData()
|
||||
assert np.array_equal(y_data, [2, 3, 4])
|
||||
x_data, y_data = multi_waveform.curves[1].getData()
|
||||
assert np.array_equal(y_data, [3, 4, 5])
|
||||
x_data, y_data = multi_waveform.curves[2].getData()
|
||||
assert np.array_equal(y_data, [4, 5, 6])
|
||||
|
||||
|
||||
def test_set_curve_highlight(qtbot, mocked_client):
|
||||
"""Test that the correct curve is highlighted."""
|
||||
# Create a BECFigure
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
# Add a multi_waveform plot
|
||||
multi_waveform = bec_figure.multi_waveform()
|
||||
multi_waveform.set_monitor("test_monitor")
|
||||
|
||||
# Simulate adding multiple curves
|
||||
for i in range(3):
|
||||
test_data = np.array([i, i + 1, i + 2])
|
||||
msg = {"data": test_data}
|
||||
metadata = {"scan_id": "scan_1"}
|
||||
multi_waveform.on_monitor_1d_update(msg, metadata)
|
||||
|
||||
# Set highlight_last_curve to False
|
||||
multi_waveform.highlight_last_curve = False
|
||||
multi_waveform.set_curve_highlight(1) # Highlight the second curve (index 1)
|
||||
|
||||
# Check that the second curve is highlighted
|
||||
visible_curves = [curve for curve in multi_waveform.curves if curve.isVisible()]
|
||||
# Reverse the list to match indexing in set_curve_highlight
|
||||
visible_curves = list(reversed(visible_curves))
|
||||
for i, curve in enumerate(visible_curves):
|
||||
pen = curve.opts["pen"]
|
||||
width = pen.width()
|
||||
if i == 1:
|
||||
# Highlighted curve should have width 5
|
||||
assert width == 5
|
||||
else:
|
||||
assert width == 1
|
||||
|
||||
|
||||
def test_set_opacity(qtbot, mocked_client):
|
||||
"""Test that setting opacity updates the curves."""
|
||||
# Create a BECFigure
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
# Add a multi_waveform plot
|
||||
multi_waveform = bec_figure.multi_waveform()
|
||||
multi_waveform.set_monitor("waveform1d")
|
||||
|
||||
# Simulate adding a curve
|
||||
test_data = np.array([1, 2, 3])
|
||||
msg = {"data": test_data}
|
||||
metadata = {"scan_id": "scan_1"}
|
||||
multi_waveform.on_monitor_1d_update(msg, metadata)
|
||||
|
||||
# Set opacity to 30
|
||||
multi_waveform.set_opacity(30)
|
||||
assert multi_waveform.config.opacity == 30
|
||||
|
||||
|
||||
def test_set_colormap(qtbot, mocked_client):
|
||||
"""Test that setting the colormap updates the curve colors."""
|
||||
# Create a BECFigure
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
# Add a multi_waveform plot
|
||||
multi_waveform = bec_figure.multi_waveform()
|
||||
multi_waveform.set_monitor("waveform1d")
|
||||
|
||||
# Simulate adding multiple curves
|
||||
for i in range(3):
|
||||
test_data = np.array([i, i + 1, i + 2])
|
||||
msg = {"data": test_data}
|
||||
metadata = {"scan_id": "scan_1"}
|
||||
multi_waveform.on_monitor_1d_update(msg, metadata)
|
||||
|
||||
# Set a new colormap
|
||||
multi_waveform.set_opacity(100)
|
||||
multi_waveform.set_colormap("viridis")
|
||||
# Check that the colors of the curves have changed accordingly
|
||||
visible_curves = [curve for curve in multi_waveform.curves if curve.isVisible()]
|
||||
# Get the colors applied
|
||||
colors = Colors.evenly_spaced_colors(colormap="viridis", num=len(visible_curves), format="HEX")
|
||||
for i, curve in enumerate(visible_curves):
|
||||
pen = curve.opts["pen"]
|
||||
pen_color = pen.color().name()
|
||||
expected_color = colors[i]
|
||||
# Compare pen color to expected color
|
||||
assert pen_color.lower() == expected_color.lower()
|
||||
|
||||
|
||||
def test_export_to_matplotlib(qtbot, mocked_client):
|
||||
"""Test that export_to_matplotlib can be called without errors."""
|
||||
try:
|
||||
import matplotlib
|
||||
except ImportError:
|
||||
pytest.skip("Matplotlib not installed")
|
||||
|
||||
# Create a BECFigure
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
# Add a multi_waveform plot
|
||||
multi_waveform = bec_figure.multi_waveform()
|
||||
multi_waveform.set_monitor("test_monitor")
|
||||
|
||||
# Simulate adding a curve
|
||||
test_data = np.array([1, 2, 3])
|
||||
msg = {"data": test_data}
|
||||
metadata = {"scan_id": "scan_1"}
|
||||
multi_waveform.on_monitor_1d_update(msg, metadata)
|
||||
|
||||
# Call export_to_matplotlib
|
||||
with mock.patch("pyqtgraph.exporters.MatplotlibExporter.export") as mock_export:
|
||||
multi_waveform.export_to_matplotlib()
|
||||
mock_export.assert_called_once()
|
@ -1,247 +0,0 @@
|
||||
# pylint: disable=missing-function-docstring, missing-module-docstring, unused-import
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
||||
from bec_widgets.widgets.containers.figure import BECFigure
|
||||
|
||||
from .client_mocks import mocked_client
|
||||
from .conftest import create_widget
|
||||
|
||||
|
||||
def test_init_plot_base(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
|
||||
assert plot_base is not None
|
||||
assert plot_base.config.widget_class == "BECPlotBase"
|
||||
assert plot_base.config.gui_id == plot_base.gui_id
|
||||
|
||||
|
||||
def test_plot_base_axes_by_separate_methods(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
|
||||
|
||||
plot_base.set_title("Test Title")
|
||||
plot_base.set_x_label("Test x Label")
|
||||
plot_base.set_y_label("Test y Label")
|
||||
plot_base.set_x_lim(1, 100)
|
||||
plot_base.set_y_lim(5, 500)
|
||||
plot_base.set_grid(True, True)
|
||||
plot_base.set_x_scale("log")
|
||||
plot_base.set_y_scale("log")
|
||||
|
||||
assert plot_base.plot_item.titleLabel.text == "Test Title"
|
||||
assert plot_base.config.axis.title == "Test Title"
|
||||
assert plot_base.plot_item.getAxis("bottom").labelText == "Test x Label"
|
||||
assert plot_base.config.axis.x_label == "Test x Label"
|
||||
assert plot_base.plot_item.getAxis("left").labelText == "Test y Label"
|
||||
assert plot_base.config.axis.y_label == "Test y Label"
|
||||
assert plot_base.config.axis.x_lim == (1, 100)
|
||||
assert plot_base.config.axis.y_lim == (5, 500)
|
||||
assert plot_base.plot_item.ctrl.xGridCheck.isChecked() == True
|
||||
assert plot_base.plot_item.ctrl.yGridCheck.isChecked() == True
|
||||
assert plot_base.plot_item.ctrl.logXCheck.isChecked() == True
|
||||
assert plot_base.plot_item.ctrl.logYCheck.isChecked() == True
|
||||
|
||||
# Check the font size by mocking the set functions
|
||||
# I struggled retrieving it from the QFont object directly
|
||||
# thus I mocked the set functions to check internally the functionality
|
||||
with (
|
||||
mock.patch.object(plot_base.plot_item, "setLabel") as mock_set_label,
|
||||
mock.patch.object(plot_base.plot_item, "setTitle") as mock_set_title,
|
||||
):
|
||||
plot_base.set_x_label("Test x Label", 20)
|
||||
plot_base.set_y_label("Test y Label", 16)
|
||||
assert mock_set_label.call_count == 2
|
||||
assert plot_base.config.axis.x_label_size == 20
|
||||
assert plot_base.config.axis.y_label_size == 16
|
||||
col = plot_base.get_text_color()
|
||||
calls = []
|
||||
style = {"color": col, "font-size": "20pt"}
|
||||
calls.append(mock.call("bottom", "Test x Label", **style))
|
||||
style = {"color": col, "font-size": "16pt"}
|
||||
calls.append(mock.call("left", "Test y Label", **style))
|
||||
assert mock_set_label.call_args_list == calls
|
||||
plot_base.set_title("Test Title", 16)
|
||||
style = {"color": col, "size": "16pt"}
|
||||
call = mock.call("Test Title", **style)
|
||||
assert mock_set_title.call_args == call
|
||||
|
||||
|
||||
def test_plot_base_axes_added_by_kwargs(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
|
||||
|
||||
plot_base.set(
|
||||
title="Test Title",
|
||||
x_label="Test x Label",
|
||||
y_label="Test y Label",
|
||||
x_lim=(1, 100),
|
||||
y_lim=(5, 500),
|
||||
x_scale="log",
|
||||
y_scale="log",
|
||||
)
|
||||
|
||||
assert plot_base.plot_item.titleLabel.text == "Test Title"
|
||||
assert plot_base.config.axis.title == "Test Title"
|
||||
assert plot_base.plot_item.getAxis("bottom").labelText == "Test x Label"
|
||||
assert plot_base.config.axis.x_label == "Test x Label"
|
||||
assert plot_base.plot_item.getAxis("left").labelText == "Test y Label"
|
||||
assert plot_base.config.axis.y_label == "Test y Label"
|
||||
assert plot_base.config.axis.x_lim == (1, 100)
|
||||
assert plot_base.config.axis.y_lim == (5, 500)
|
||||
assert plot_base.plot_item.ctrl.logXCheck.isChecked() == True
|
||||
assert plot_base.plot_item.ctrl.logYCheck.isChecked() == True
|
||||
|
||||
|
||||
def test_lock_aspect_ratio(qtbot, mocked_client):
|
||||
"""
|
||||
Test locking and unlocking the aspect ratio of the plot.
|
||||
"""
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
|
||||
|
||||
# Lock the aspect ratio
|
||||
plot_base.lock_aspect_ratio(True)
|
||||
assert plot_base.plot_item.vb.state["aspectLocked"] == 1
|
||||
|
||||
# Unlock the aspect ratio
|
||||
plot_base.lock_aspect_ratio(False)
|
||||
assert plot_base.plot_item.vb.state["aspectLocked"] == 0
|
||||
|
||||
|
||||
def test_set_auto_range(qtbot, mocked_client):
|
||||
"""
|
||||
Test enabling and disabling auto range for the plot.
|
||||
"""
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
|
||||
|
||||
# Enable auto range for both axes
|
||||
plot_base.set_auto_range(True, axis="xy")
|
||||
assert plot_base.plot_item.vb.state["autoRange"] == [True, True]
|
||||
|
||||
# Disable auto range for x-axis
|
||||
plot_base.set_auto_range(False, axis="x")
|
||||
assert plot_base.plot_item.vb.state["autoRange"] == [False, True]
|
||||
|
||||
# Disable auto range for y-axis
|
||||
plot_base.set_auto_range(False, axis="y")
|
||||
assert plot_base.plot_item.vb.state["autoRange"] == [False, False]
|
||||
|
||||
|
||||
def test_set_outer_axes(qtbot, mocked_client):
|
||||
"""
|
||||
Test showing and hiding the outer axes of the plot.
|
||||
"""
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
|
||||
|
||||
# Show outer axes
|
||||
plot_base.set_outer_axes(True)
|
||||
assert plot_base.plot_item.getAxis("top").isVisible()
|
||||
assert plot_base.plot_item.getAxis("right").isVisible()
|
||||
assert plot_base.config.axis.outer_axes is True
|
||||
|
||||
# Hide outer axes
|
||||
plot_base.set_outer_axes(False)
|
||||
assert not plot_base.plot_item.getAxis("top").isVisible()
|
||||
assert not plot_base.plot_item.getAxis("right").isVisible()
|
||||
assert plot_base.config.axis.outer_axes is False
|
||||
|
||||
|
||||
def test_toggle_crosshair(qtbot, mocked_client):
|
||||
"""
|
||||
Test toggling the crosshair on and off.
|
||||
"""
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
|
||||
|
||||
# Toggle crosshair on
|
||||
plot_base.toggle_crosshair()
|
||||
assert plot_base.crosshair is not None
|
||||
|
||||
# Toggle crosshair off
|
||||
plot_base.toggle_crosshair()
|
||||
assert plot_base.crosshair is None
|
||||
|
||||
|
||||
def test_invalid_scale_input(qtbot, mocked_client):
|
||||
"""
|
||||
Test setting an invalid scale for x and y axes.
|
||||
"""
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
plot_base.set_x_scale("invalid_scale")
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
plot_base.set_y_scale("invalid_scale")
|
||||
|
||||
|
||||
def test_set_x_lim_invalid_arguments(qtbot, mocked_client):
|
||||
"""
|
||||
Test passing invalid arguments to set_x_lim.
|
||||
"""
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
plot_base.set_x_lim(1)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
plot_base.set_x_lim((1, 2, 3))
|
||||
|
||||
|
||||
def test_set_y_lim_invalid_arguments(qtbot, mocked_client):
|
||||
"""
|
||||
Test passing invalid arguments to set_y_lim.
|
||||
"""
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
plot_base.set_y_lim(1)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
plot_base.set_y_lim((1, 2, 3))
|
||||
|
||||
|
||||
def test_remove_plot(qtbot, mocked_client):
|
||||
"""
|
||||
Test removing the plot widget from the figure.
|
||||
"""
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
with mock.patch.object(bec_figure, "remove") as mock_remove:
|
||||
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
|
||||
plot_base.remove()
|
||||
mock_remove.assert_called_once_with(widget_id=plot_base.gui_id)
|
||||
|
||||
|
||||
def test_add_fps_monitor(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
|
||||
|
||||
plot_base.enable_fps_monitor(True)
|
||||
|
||||
assert plot_base.fps_monitor is not None
|
||||
assert plot_base.fps_monitor.view_box is plot_base.plot_item.getViewBox()
|
||||
assert plot_base.fps_monitor.timer.isActive() == True
|
||||
assert plot_base.fps_monitor.timer.interval() == 1000
|
||||
assert plot_base.fps_monitor.sigFpsUpdate is not None
|
||||
assert plot_base.fps_monitor.sigFpsUpdate.connect is not None
|
||||
|
||||
|
||||
def test_hook_unhook_fps_monitor(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
|
||||
|
||||
plot_base.enable_fps_monitor(True)
|
||||
assert plot_base.fps_monitor is not None
|
||||
|
||||
plot_base.enable_fps_monitor(False)
|
||||
assert plot_base.fps_monitor is None
|
||||
|
||||
plot_base.enable_fps_monitor(True)
|
||||
assert plot_base.fps_monitor is not None
|
@ -6,7 +6,7 @@ def test_client_generator_classes():
|
||||
connector_cls_names = [cls.__name__ for cls in out.connector_classes]
|
||||
plugins = [cls.__name__ for cls in out.plugins]
|
||||
|
||||
assert "BECFigure" in connector_cls_names
|
||||
assert "BECWaveform" in connector_cls_names
|
||||
assert "Image" in connector_cls_names
|
||||
assert "Waveform" in connector_cls_names
|
||||
assert "BECDockArea" in plugins
|
||||
assert "BECWaveform" not in plugins
|
||||
assert "NonExisting" not in plugins
|
||||
|
@ -3,7 +3,7 @@ from unittest import mock
|
||||
import pytest
|
||||
|
||||
from bec_widgets.cli.server import _start_server
|
||||
from bec_widgets.widgets.containers.figure import BECFigure
|
||||
from bec_widgets.widgets.containers.dock import BECDockArea
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -20,9 +20,9 @@ def test_rpc_server_start_server_without_service_config(mocked_cli_server):
|
||||
"""
|
||||
mock_server, mock_config, _ = mocked_cli_server
|
||||
|
||||
_start_server("gui_id", BECFigure, config=None)
|
||||
_start_server("gui_id", BECDockArea, config=None)
|
||||
mock_server.assert_called_once_with(
|
||||
gui_id="gui_id", config=mock_config(), gui_class=BECFigure, gui_class_id="bec"
|
||||
gui_id="gui_id", config=mock_config(), gui_class=BECDockArea, gui_class_id="bec"
|
||||
)
|
||||
|
||||
|
||||
@ -39,7 +39,7 @@ def test_rpc_server_start_server_with_service_config(mocked_cli_server, config,
|
||||
"""
|
||||
mock_server, mock_config, _ = mocked_cli_server
|
||||
config = mock_config(**call_config)
|
||||
_start_server("gui_id", BECFigure, config=config)
|
||||
_start_server("gui_id", BECDockArea, config=config)
|
||||
mock_server.assert_called_once_with(
|
||||
gui_id="gui_id", config=config, gui_class=BECFigure, gui_class_id="bec"
|
||||
gui_id="gui_id", config=config, gui_class=BECDockArea, gui_class_id="bec"
|
||||
)
|
||||
|
@ -1,113 +1,114 @@
|
||||
import pytest
|
||||
from qtpy.QtCore import QPointF
|
||||
|
||||
from bec_widgets.widgets.containers.figure import BECFigure
|
||||
|
||||
from .client_mocks import mocked_client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def plot_widget_with_arrow_item(qtbot, mocked_client):
|
||||
widget = BECFigure(client=mocked_client())
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
waveform = widget.plot()
|
||||
|
||||
yield waveform.arrow_item, waveform.plot_item
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def plot_widget_with_tick_item(qtbot, mocked_client):
|
||||
widget = BECFigure(client=mocked_client())
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
waveform = widget.plot()
|
||||
|
||||
yield waveform.tick_item, waveform.plot_item
|
||||
|
||||
|
||||
def test_arrow_item_add_to_plot(plot_widget_with_arrow_item):
|
||||
"""Test the add_to_plot method"""
|
||||
arrow_item, plot_item = plot_widget_with_arrow_item
|
||||
assert arrow_item.plot_item is not None
|
||||
assert arrow_item.plot_item.items == []
|
||||
arrow_item.add_to_plot()
|
||||
assert arrow_item.plot_item.items == [arrow_item.arrow_item]
|
||||
arrow_item.remove_from_plot()
|
||||
|
||||
|
||||
def test_arrow_item_set_position(plot_widget_with_arrow_item):
|
||||
"""Test the set_position method"""
|
||||
arrow_item, plot_item = plot_widget_with_arrow_item
|
||||
container = []
|
||||
|
||||
def signal_callback(tup: tuple):
|
||||
container.append(tup)
|
||||
|
||||
arrow_item.add_to_plot()
|
||||
arrow_item.position_changed.connect(signal_callback)
|
||||
arrow_item.set_position(pos=(1, 1))
|
||||
point = QPointF(1.0, 1.0)
|
||||
assert arrow_item.arrow_item.pos() == point
|
||||
arrow_item.set_position(pos=(2, 2))
|
||||
point = QPointF(2.0, 2.0)
|
||||
assert arrow_item.arrow_item.pos() == point
|
||||
assert container == [(1, 1), (2, 2)]
|
||||
arrow_item.remove_from_plot()
|
||||
|
||||
|
||||
def test_arrow_item_cleanup(plot_widget_with_arrow_item):
|
||||
"""Test cleanup procedure"""
|
||||
arrow_item, plot_item = plot_widget_with_arrow_item
|
||||
arrow_item.add_to_plot()
|
||||
assert arrow_item.item_on_plot is True
|
||||
arrow_item.cleanup()
|
||||
assert arrow_item.plot_item.items == []
|
||||
assert arrow_item.item_on_plot is False
|
||||
assert arrow_item.arrow_item is None
|
||||
|
||||
|
||||
def test_tick_item_add_to_plot(plot_widget_with_tick_item):
|
||||
"""Test the add_to_plot method"""
|
||||
tick_item, plot_item = plot_widget_with_tick_item
|
||||
assert tick_item.plot_item is not None
|
||||
assert tick_item.plot_item.items == []
|
||||
tick_item.add_to_plot()
|
||||
assert tick_item.plot_item.layout.itemAt(2, 1) == tick_item.tick_item
|
||||
assert tick_item.item_on_plot is True
|
||||
new_pos = plot_item.vb.geometry().bottom()
|
||||
pos = tick_item.tick.pos()
|
||||
new_pos = tick_item.tick_item.mapFromParent(QPointF(pos.x(), new_pos))
|
||||
assert new_pos.y() == pos.y()
|
||||
tick_item.remove_from_plot()
|
||||
|
||||
|
||||
def test_tick_item_set_position(plot_widget_with_tick_item):
|
||||
"""Test the set_position method"""
|
||||
tick_item, plot_item = plot_widget_with_tick_item
|
||||
container = []
|
||||
|
||||
def signal_callback(val: float):
|
||||
container.append(val)
|
||||
|
||||
tick_item.add_to_plot()
|
||||
tick_item.position_changed.connect(signal_callback)
|
||||
|
||||
tick_item.set_position(pos=1)
|
||||
assert tick_item._pos == 1
|
||||
tick_item.set_position(pos=2)
|
||||
assert tick_item._pos == 2
|
||||
assert container == [1.0, 2.0]
|
||||
tick_item.remove_from_plot()
|
||||
|
||||
|
||||
def test_tick_item_cleanup(plot_widget_with_tick_item):
|
||||
"""Test cleanup procedure"""
|
||||
tick_item, plot_item = plot_widget_with_tick_item
|
||||
tick_item.add_to_plot()
|
||||
assert tick_item.item_on_plot is True
|
||||
tick_item.cleanup()
|
||||
ticks = getattr(tick_item.plot_item.layout.itemAt(3, 1), "ticks", None)
|
||||
assert ticks == None
|
||||
assert tick_item.item_on_plot is False
|
||||
assert tick_item.tick_item is None
|
||||
# TODO temporary disabled until migrate tick and arrow items to new system
|
||||
# import pytest
|
||||
# from qtpy.QtCore import QPointF
|
||||
#
|
||||
# from bec_widgets.widgets.containers.figure import BECFigure
|
||||
#
|
||||
# from .client_mocks import mocked_client
|
||||
#
|
||||
#
|
||||
# @pytest.fixture
|
||||
# def plot_widget_with_arrow_item(qtbot, mocked_client):
|
||||
# widget = BECFigure(client=mocked_client())
|
||||
# qtbot.addWidget(widget)
|
||||
# qtbot.waitExposed(widget)
|
||||
# waveform = widget.plot()
|
||||
#
|
||||
# yield waveform.arrow_item, waveform.plot_item
|
||||
#
|
||||
#
|
||||
# @pytest.fixture
|
||||
# def plot_widget_with_tick_item(qtbot, mocked_client):
|
||||
# widget = BECFigure(client=mocked_client())
|
||||
# qtbot.addWidget(widget)
|
||||
# qtbot.waitExposed(widget)
|
||||
# waveform = widget.plot()
|
||||
#
|
||||
# yield waveform.tick_item, waveform.plot_item
|
||||
#
|
||||
#
|
||||
# def test_arrow_item_add_to_plot(plot_widget_with_arrow_item):
|
||||
# """Test the add_to_plot method"""
|
||||
# arrow_item, plot_item = plot_widget_with_arrow_item
|
||||
# assert arrow_item.plot_item is not None
|
||||
# assert arrow_item.plot_item.items == []
|
||||
# arrow_item.add_to_plot()
|
||||
# assert arrow_item.plot_item.items == [arrow_item.arrow_item]
|
||||
# arrow_item.remove_from_plot()
|
||||
#
|
||||
#
|
||||
# def test_arrow_item_set_position(plot_widget_with_arrow_item):
|
||||
# """Test the set_position method"""
|
||||
# arrow_item, plot_item = plot_widget_with_arrow_item
|
||||
# container = []
|
||||
#
|
||||
# def signal_callback(tup: tuple):
|
||||
# container.append(tup)
|
||||
#
|
||||
# arrow_item.add_to_plot()
|
||||
# arrow_item.position_changed.connect(signal_callback)
|
||||
# arrow_item.set_position(pos=(1, 1))
|
||||
# point = QPointF(1.0, 1.0)
|
||||
# assert arrow_item.arrow_item.pos() == point
|
||||
# arrow_item.set_position(pos=(2, 2))
|
||||
# point = QPointF(2.0, 2.0)
|
||||
# assert arrow_item.arrow_item.pos() == point
|
||||
# assert container == [(1, 1), (2, 2)]
|
||||
# arrow_item.remove_from_plot()
|
||||
#
|
||||
#
|
||||
# def test_arrow_item_cleanup(plot_widget_with_arrow_item):
|
||||
# """Test cleanup procedure"""
|
||||
# arrow_item, plot_item = plot_widget_with_arrow_item
|
||||
# arrow_item.add_to_plot()
|
||||
# assert arrow_item.item_on_plot is True
|
||||
# arrow_item.cleanup()
|
||||
# assert arrow_item.plot_item.items == []
|
||||
# assert arrow_item.item_on_plot is False
|
||||
# assert arrow_item.arrow_item is None
|
||||
#
|
||||
#
|
||||
# def test_tick_item_add_to_plot(plot_widget_with_tick_item):
|
||||
# """Test the add_to_plot method"""
|
||||
# tick_item, plot_item = plot_widget_with_tick_item
|
||||
# assert tick_item.plot_item is not None
|
||||
# assert tick_item.plot_item.items == []
|
||||
# tick_item.add_to_plot()
|
||||
# assert tick_item.plot_item.layout.itemAt(2, 1) == tick_item.tick_item
|
||||
# assert tick_item.item_on_plot is True
|
||||
# new_pos = plot_item.vb.geometry().bottom()
|
||||
# pos = tick_item.tick.pos()
|
||||
# new_pos = tick_item.tick_item.mapFromParent(QPointF(pos.x(), new_pos))
|
||||
# assert new_pos.y() == pos.y()
|
||||
# tick_item.remove_from_plot()
|
||||
#
|
||||
#
|
||||
# def test_tick_item_set_position(plot_widget_with_tick_item):
|
||||
# """Test the set_position method"""
|
||||
# tick_item, plot_item = plot_widget_with_tick_item
|
||||
# container = []
|
||||
#
|
||||
# def signal_callback(val: float):
|
||||
# container.append(val)
|
||||
#
|
||||
# tick_item.add_to_plot()
|
||||
# tick_item.position_changed.connect(signal_callback)
|
||||
#
|
||||
# tick_item.set_position(pos=1)
|
||||
# assert tick_item._pos == 1
|
||||
# tick_item.set_position(pos=2)
|
||||
# assert tick_item._pos == 2
|
||||
# assert container == [1.0, 2.0]
|
||||
# tick_item.remove_from_plot()
|
||||
#
|
||||
#
|
||||
# def test_tick_item_cleanup(plot_widget_with_tick_item):
|
||||
# """Test cleanup procedure"""
|
||||
# tick_item, plot_item = plot_widget_with_tick_item
|
||||
# tick_item.add_to_plot()
|
||||
# assert tick_item.item_on_plot is True
|
||||
# tick_item.cleanup()
|
||||
# ticks = getattr(tick_item.plot_item.layout.itemAt(3, 1), "ticks", None)
|
||||
# assert ticks == None
|
||||
# assert tick_item.item_on_plot is False
|
||||
# assert tick_item.tick_item is None
|
||||
|
@ -1,766 +0,0 @@
|
||||
# pylint: disable=missing-function-docstring, missing-module-docstring, unused-import
|
||||
from unittest import mock
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from bec_lib.scan_items import ScanItem
|
||||
|
||||
from bec_widgets.widgets.containers.figure import BECFigure
|
||||
from bec_widgets.widgets.containers.figure.plots.waveform.waveform_curve import (
|
||||
CurveConfig,
|
||||
Signal,
|
||||
SignalData,
|
||||
)
|
||||
|
||||
from .client_mocks import mocked_client
|
||||
from .conftest import create_widget
|
||||
|
||||
|
||||
def test_adding_curve_to_waveform(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot()
|
||||
|
||||
# adding curve which is in bec - only names
|
||||
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i")
|
||||
assert c1.config.label == "bpm4i-bpm4i"
|
||||
|
||||
# adding curve which is in bec - names and entry
|
||||
c2 = w1.add_curve_bec(x_name="samx", x_entry="samx", y_name="bpm3a", y_entry="bpm3a")
|
||||
assert c2.config.label == "bpm3a-bpm3a"
|
||||
|
||||
# adding curve which is not in bec
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
w1.add_curve_bec(x_name="non_existent_device", y_name="non_existent_device")
|
||||
assert "Device 'non_existent_device' not found in current BEC session" in str(excinfo.value)
|
||||
|
||||
# adding wrong entry for samx
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
w1.add_curve_bec(
|
||||
x_name="samx", x_entry="non_existent_entry", y_name="bpm3a", y_entry="bpm3a"
|
||||
)
|
||||
assert "Entry 'non_existent_entry' not found in device 'samx' signals" in str(excinfo.value)
|
||||
|
||||
# adding wrong device with validation switched off
|
||||
c3 = w1.add_curve_bec(x_name="samx", y_name="non_existent_device", validate_bec=False)
|
||||
assert c3.config.label == "non_existent_device-non_existent_device"
|
||||
|
||||
|
||||
def test_adding_curve_with_same_id(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot()
|
||||
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i", gui_id="test_curve")
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
w1.add_curve_bec(x_name="samx", y_name="bpm4i", gui_id="test_curve")
|
||||
assert "Curve with ID 'test_curve' already exists." in str(excinfo.value)
|
||||
|
||||
|
||||
def test_create_waveform1D_by_config(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1_config_input = {
|
||||
"widget_class": "BECWaveform",
|
||||
"gui_id": "widget_1",
|
||||
"parent_id": "BECFigure_1708689320.788527",
|
||||
"row": 0,
|
||||
"col": 0,
|
||||
"axis": {
|
||||
"title": "Widget 1",
|
||||
"title_size": None,
|
||||
"x_label": None,
|
||||
"x_label_size": None,
|
||||
"y_label": None,
|
||||
"y_label_size": None,
|
||||
"legend_label_size": None,
|
||||
"x_scale": "linear",
|
||||
"y_scale": "linear",
|
||||
"x_lim": (1, 10),
|
||||
"y_lim": None,
|
||||
"x_grid": False,
|
||||
"y_grid": False,
|
||||
"outer_axes": False,
|
||||
},
|
||||
"color_palette": "magma",
|
||||
"curves": {
|
||||
"bpm4i-bpm4i": {
|
||||
"widget_class": "BECCurve",
|
||||
"gui_id": "BECCurve_1708689321.226847",
|
||||
"parent_id": "widget_1",
|
||||
"label": "bpm4i-bpm4i",
|
||||
"color": "#cc4778",
|
||||
"color_map_z": "magma",
|
||||
"symbol": "o",
|
||||
"symbol_color": None,
|
||||
"symbol_size": 7,
|
||||
"pen_width": 4,
|
||||
"pen_style": "dash",
|
||||
"source": "scan_segment",
|
||||
"signals": {
|
||||
"dap": None,
|
||||
"source": "scan_segment",
|
||||
"x": {
|
||||
"name": "samx",
|
||||
"entry": "samx",
|
||||
"unit": None,
|
||||
"modifier": None,
|
||||
"limits": None,
|
||||
},
|
||||
"y": {
|
||||
"name": "bpm4i",
|
||||
"entry": "bpm4i",
|
||||
"unit": None,
|
||||
"modifier": None,
|
||||
"limits": None,
|
||||
},
|
||||
"z": None,
|
||||
},
|
||||
},
|
||||
"curve-custom": {
|
||||
"widget_class": "BECCurve",
|
||||
"gui_id": "BECCurve_1708689321.22867",
|
||||
"parent_id": "widget_1",
|
||||
"label": "curve-custom",
|
||||
"color": "blue",
|
||||
"color_map_z": "magma",
|
||||
"symbol": "o",
|
||||
"symbol_color": None,
|
||||
"symbol_size": 7,
|
||||
"pen_width": 5,
|
||||
"pen_style": "dashdot",
|
||||
"source": "custom",
|
||||
"signals": None,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
w1 = bec_figure.plot(config=w1_config_input)
|
||||
|
||||
w1_config_output = w1.get_config()
|
||||
w1_config_input["gui_id"] = w1.gui_id
|
||||
|
||||
assert w1_config_input == w1_config_output
|
||||
assert w1.plot_item.titleLabel.text == "Widget 1"
|
||||
assert w1.config.axis.title == "Widget 1"
|
||||
|
||||
|
||||
def test_change_gui_id(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot()
|
||||
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i")
|
||||
w1.change_gui_id("new_id")
|
||||
|
||||
assert w1.config.gui_id == "new_id"
|
||||
assert c1.config.parent_id == "new_id"
|
||||
|
||||
|
||||
def test_getting_curve(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot()
|
||||
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i", gui_id="test_curve")
|
||||
c1_expected_config_dark = CurveConfig(
|
||||
widget_class="BECCurve",
|
||||
gui_id="test_curve",
|
||||
parent_id=w1.gui_id,
|
||||
label="bpm4i-bpm4i",
|
||||
color="#3b0f70",
|
||||
symbol="o",
|
||||
symbol_color=None,
|
||||
symbol_size=7,
|
||||
pen_width=4,
|
||||
pen_style="solid",
|
||||
source="scan_segment",
|
||||
signals=Signal(
|
||||
source="scan_segment",
|
||||
x=SignalData(name="samx", entry="samx", unit=None, modifier=None),
|
||||
y=SignalData(name="bpm4i", entry="bpm4i", unit=None, modifier=None),
|
||||
),
|
||||
)
|
||||
c1_expected_config_light = CurveConfig(
|
||||
widget_class="BECCurve",
|
||||
gui_id="test_curve",
|
||||
parent_id=w1.gui_id,
|
||||
label="bpm4i-bpm4i",
|
||||
color="#000004",
|
||||
symbol="o",
|
||||
symbol_color=None,
|
||||
symbol_size=7,
|
||||
pen_width=4,
|
||||
pen_style="solid",
|
||||
source="scan_segment",
|
||||
signals=Signal(
|
||||
source="scan_segment",
|
||||
x=SignalData(name="samx", entry="samx", unit=None, modifier=None),
|
||||
y=SignalData(name="bpm4i", entry="bpm4i", unit=None, modifier=None),
|
||||
),
|
||||
)
|
||||
|
||||
assert (
|
||||
w1.curves[0].config == c1_expected_config_dark
|
||||
or w1.curves[0].config == c1_expected_config_light
|
||||
)
|
||||
assert (
|
||||
w1._curves_data["scan_segment"]["bpm4i-bpm4i"].config == c1_expected_config_dark
|
||||
or w1._curves_data["scan_segment"]["bpm4i-bpm4i"].config == c1_expected_config_light
|
||||
)
|
||||
assert (
|
||||
w1.get_curve(0).config == c1_expected_config_dark
|
||||
or w1.get_curve(0).config == c1_expected_config_light
|
||||
)
|
||||
assert (
|
||||
w1.get_curve_config("bpm4i-bpm4i", dict_output=True) == c1_expected_config_dark.model_dump()
|
||||
or w1.get_curve_config("bpm4i-bpm4i", dict_output=True)
|
||||
== c1_expected_config_light.model_dump()
|
||||
)
|
||||
assert (
|
||||
w1.get_curve_config("bpm4i-bpm4i", dict_output=False) == c1_expected_config_dark
|
||||
or w1.get_curve_config("bpm4i-bpm4i", dict_output=False) == c1_expected_config_light
|
||||
)
|
||||
assert (
|
||||
w1.get_curve("bpm4i-bpm4i").config == c1_expected_config_dark
|
||||
or w1.get_curve("bpm4i-bpm4i").config == c1_expected_config_light
|
||||
)
|
||||
assert (
|
||||
c1.get_config(False) == c1_expected_config_dark
|
||||
or c1.get_config(False) == c1_expected_config_light
|
||||
)
|
||||
assert (
|
||||
c1.get_config() == c1_expected_config_dark.model_dump()
|
||||
or c1.get_config() == c1_expected_config_light.model_dump()
|
||||
)
|
||||
|
||||
|
||||
def test_getting_curve_errors(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot()
|
||||
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i", gui_id="test_curve")
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
w1.get_curve("non_existent_curve")
|
||||
assert "Curve with ID 'non_existent_curve' not found." in str(excinfo.value)
|
||||
with pytest.raises(IndexError) as excinfo:
|
||||
w1.get_curve(1)
|
||||
assert "list index out of range" in str(excinfo.value)
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
w1.get_curve(1.2)
|
||||
assert "Identifier must be either an integer (index) or a string (curve_id)." in str(
|
||||
excinfo.value
|
||||
)
|
||||
|
||||
|
||||
def test_add_curve(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot()
|
||||
|
||||
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i")
|
||||
|
||||
assert len(w1.curves) == 1
|
||||
assert w1._curves_data["scan_segment"] == {"bpm4i-bpm4i": c1}
|
||||
assert c1.config.label == "bpm4i-bpm4i"
|
||||
assert c1.config.source == "scan_segment"
|
||||
|
||||
|
||||
def test_change_legend_font_size(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
plot = bec_figure.plot()
|
||||
|
||||
w1 = plot.add_curve_bec(x_name="samx", y_name="bpm4i")
|
||||
my_func = plot.plot_item.legend
|
||||
with mock.patch.object(my_func, "setScale") as mock_set_scale:
|
||||
plot.set_legend_label_size(18)
|
||||
assert plot.config.axis.legend_label_size == 18
|
||||
assert mock_set_scale.call_count == 1
|
||||
assert mock_set_scale.call_args == mock.call(2)
|
||||
|
||||
|
||||
def test_remove_curve(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot()
|
||||
|
||||
w1.add_curve_bec(x_name="samx", y_name="bpm4i")
|
||||
w1.add_curve_bec(x_name="samx", y_name="bpm3a")
|
||||
w1.remove_curve(0)
|
||||
w1.remove_curve("bpm3a-bpm3a")
|
||||
|
||||
assert len(w1.plot_item.curves) == 0
|
||||
assert w1._curves_data["scan_segment"] == {}
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
w1.remove_curve(1.2)
|
||||
assert "Each identifier must be either an integer (index) or a string (curve_id)." in str(
|
||||
excinfo.value
|
||||
)
|
||||
|
||||
|
||||
def test_change_curve_appearance_methods(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot()
|
||||
|
||||
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i")
|
||||
|
||||
c1.set_color("#0000ff")
|
||||
c1.set_symbol("x")
|
||||
c1.set_symbol_color("#ff0000")
|
||||
c1.set_symbol_size(10)
|
||||
c1.set_pen_width(3)
|
||||
c1.set_pen_style("dashdot")
|
||||
|
||||
qtbot.wait(500)
|
||||
assert c1.config.color == "#0000ff"
|
||||
assert c1.config.symbol == "x"
|
||||
assert c1.config.symbol_color == "#ff0000"
|
||||
assert c1.config.symbol_size == 10
|
||||
assert c1.config.pen_width == 3
|
||||
assert c1.config.pen_style == "dashdot"
|
||||
assert c1.config.source == "scan_segment"
|
||||
assert c1.config.signals.model_dump() == {
|
||||
"dap": None,
|
||||
"source": "scan_segment",
|
||||
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
|
||||
"y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None, "limits": None},
|
||||
"z": None,
|
||||
}
|
||||
|
||||
|
||||
def test_change_curve_appearance_args(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot()
|
||||
|
||||
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i")
|
||||
|
||||
c1.set(
|
||||
color="#0000ff",
|
||||
symbol="x",
|
||||
symbol_color="#ff0000",
|
||||
symbol_size=10,
|
||||
pen_width=3,
|
||||
pen_style="dashdot",
|
||||
)
|
||||
|
||||
assert c1.config.color == "#0000ff"
|
||||
assert c1.config.symbol == "x"
|
||||
assert c1.config.symbol_color == "#ff0000"
|
||||
assert c1.config.symbol_size == 10
|
||||
assert c1.config.pen_width == 3
|
||||
assert c1.config.pen_style == "dashdot"
|
||||
assert c1.config.source == "scan_segment"
|
||||
assert c1.config.signals.model_dump() == {
|
||||
"dap": None,
|
||||
"source": "scan_segment",
|
||||
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
|
||||
"y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None, "limits": None},
|
||||
"z": None,
|
||||
}
|
||||
|
||||
|
||||
def test_set_custom_curve_data(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot()
|
||||
|
||||
c1 = w1.add_curve_custom(
|
||||
x=[1, 2, 3],
|
||||
y=[4, 5, 6],
|
||||
label="custom_curve",
|
||||
color="#0000ff",
|
||||
symbol="x",
|
||||
symbol_color="#ff0000",
|
||||
symbol_size=10,
|
||||
pen_width=3,
|
||||
pen_style="dashdot",
|
||||
)
|
||||
|
||||
x_init, y_init = c1.get_data()
|
||||
|
||||
assert np.array_equal(x_init, [1, 2, 3])
|
||||
assert np.array_equal(y_init, [4, 5, 6])
|
||||
assert c1.config.label == "custom_curve"
|
||||
assert c1.config.color == "#0000ff"
|
||||
assert c1.config.symbol == "x"
|
||||
assert c1.config.symbol_color == "#ff0000"
|
||||
assert c1.config.symbol_size == 10
|
||||
assert c1.config.pen_width == 3
|
||||
assert c1.config.pen_style == "dashdot"
|
||||
assert c1.config.source == "custom"
|
||||
assert c1.config.signals == None
|
||||
|
||||
c1.set_data(x=[4, 5, 6], y=[7, 8, 9])
|
||||
|
||||
x_new, y_new = c1.get_data()
|
||||
assert np.array_equal(x_new, [4, 5, 6])
|
||||
assert np.array_equal(y_new, [7, 8, 9])
|
||||
|
||||
|
||||
def test_custom_data_2D_array(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
|
||||
data = np.random.rand(10, 2)
|
||||
|
||||
plt = bec_figure.plot(data)
|
||||
|
||||
x, y = plt.curves[0].get_data()
|
||||
|
||||
assert np.array_equal(x, data[:, 0])
|
||||
assert np.array_equal(y, data[:, 1])
|
||||
|
||||
|
||||
def test_get_all_data(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot()
|
||||
|
||||
c1 = w1.add_curve_custom(
|
||||
x=[1, 2, 3],
|
||||
y=[4, 5, 6],
|
||||
label="custom_curve-1",
|
||||
color="#0000ff",
|
||||
symbol="x",
|
||||
symbol_color="#ff0000",
|
||||
symbol_size=10,
|
||||
pen_width=3,
|
||||
pen_style="dashdot",
|
||||
)
|
||||
|
||||
c2 = w1.add_curve_custom(
|
||||
x=[4, 5, 6],
|
||||
y=[7, 8, 9],
|
||||
label="custom_curve-2",
|
||||
color="#00ff00",
|
||||
symbol="o",
|
||||
symbol_color="#00ff00",
|
||||
symbol_size=20,
|
||||
pen_width=4,
|
||||
pen_style="dash",
|
||||
)
|
||||
|
||||
all_data = w1.get_all_data()
|
||||
|
||||
assert all_data == {
|
||||
"custom_curve-1": {"x": [1, 2, 3], "y": [4, 5, 6]},
|
||||
"custom_curve-2": {"x": [4, 5, 6], "y": [7, 8, 9]},
|
||||
}
|
||||
|
||||
|
||||
def test_curve_add_by_config(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot()
|
||||
|
||||
c1_config_input = {
|
||||
"widget_class": "BECCurve",
|
||||
"gui_id": "BECCurve_1708689321.226847",
|
||||
"parent_id": "widget_1",
|
||||
"label": "bpm4i-bpm4i",
|
||||
"color": "#cc4778",
|
||||
"color_map_z": "magma",
|
||||
"symbol": "o",
|
||||
"symbol_color": None,
|
||||
"symbol_size": 7,
|
||||
"pen_width": 4,
|
||||
"pen_style": "dash",
|
||||
"source": "scan_segment",
|
||||
"signals": {
|
||||
"dap": None,
|
||||
"source": "scan_segment",
|
||||
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
|
||||
"y": {
|
||||
"name": "bpm4i",
|
||||
"entry": "bpm4i",
|
||||
"unit": None,
|
||||
"modifier": None,
|
||||
"limits": None,
|
||||
},
|
||||
"z": None,
|
||||
},
|
||||
}
|
||||
|
||||
c1 = w1.add_curve_by_config(c1_config_input)
|
||||
|
||||
c1_config_dict = c1.get_config()
|
||||
|
||||
assert c1_config_dict == c1_config_input
|
||||
assert c1.config == CurveConfig(**c1_config_input)
|
||||
assert c1.get_config(False) == CurveConfig(**c1_config_input)
|
||||
|
||||
|
||||
def test_scan_update(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot()
|
||||
|
||||
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i")
|
||||
|
||||
msg_waveform = {
|
||||
"data": {
|
||||
"samx": {"samx": {"value": 10}},
|
||||
"bpm4i": {"bpm4i": {"value": 5}},
|
||||
"gauss_bpm": {"gauss_bpm": {"value": 6}},
|
||||
"gauss_adc1": {"gauss_adc1": {"value": 8}},
|
||||
"gauss_adc2": {"gauss_adc2": {"value": 9}},
|
||||
},
|
||||
"scan_id": 1,
|
||||
}
|
||||
# Mock scan_storage.find_scan_by_ID
|
||||
mock_scan_data_waveform = mock.MagicMock(spec=ScanItem)
|
||||
mock_scan_data_waveform.live_data = {
|
||||
device_name: {
|
||||
entry: mock.MagicMock(val=[msg_waveform["data"][device_name][entry]["value"]])
|
||||
for entry in msg_waveform["data"][device_name]
|
||||
}
|
||||
for device_name in msg_waveform["data"]
|
||||
}
|
||||
|
||||
metadata_waveform = {"scan_name": "line_scan"}
|
||||
|
||||
w1.queue.scan_storage.find_scan_by_ID.return_value = mock_scan_data_waveform
|
||||
|
||||
w1.on_scan_segment(msg_waveform, metadata_waveform)
|
||||
qtbot.wait(500)
|
||||
assert c1.get_data() == ([10], [5])
|
||||
|
||||
|
||||
def test_scan_history_with_val_access(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot()
|
||||
|
||||
w1.plot(x_name="samx", y_name="bpm4i")
|
||||
|
||||
mock_scan_data = {
|
||||
"samx": {"samx": mock.MagicMock(val=np.array([1, 2, 3]))}, # Use mock.MagicMock for .val
|
||||
"bpm4i": {"bpm4i": mock.MagicMock(val=np.array([4, 5, 6]))}, # Use mock.MagicMock for .val
|
||||
}
|
||||
|
||||
mock_scan_storage = mock.MagicMock()
|
||||
scan_item_mock = mock.MagicMock(spec=ScanItem)
|
||||
scan_item_mock.data = mock_scan_data
|
||||
mock_scan_storage.find_scan_by_ID.return_value = scan_item_mock
|
||||
w1.queue.scan_storage = mock_scan_storage
|
||||
|
||||
fake_scan_id = "fake_scan_id"
|
||||
w1.scan_history(scan_id=fake_scan_id)
|
||||
|
||||
qtbot.wait(500)
|
||||
|
||||
x_data, y_data = w1.curves[0].get_data()
|
||||
|
||||
assert np.array_equal(x_data, [1, 2, 3])
|
||||
assert np.array_equal(y_data, [4, 5, 6])
|
||||
|
||||
|
||||
def test_scatter_2d_update(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot()
|
||||
|
||||
c1 = w1.add_curve_bec(x_name="samx", y_name="samx", z_name="bpm4i")
|
||||
|
||||
msg = {
|
||||
"data": {
|
||||
"samx": {"samx": {"value": [1, 2, 3]}},
|
||||
"samy": {"samy": {"value": [4, 5, 6]}},
|
||||
"bpm4i": {"bpm4i": {"value": [1, 3, 2]}},
|
||||
},
|
||||
"scan_id": 1,
|
||||
}
|
||||
msg_metadata = {"scan_name": "line_scan"}
|
||||
|
||||
mock_scan_item = mock.MagicMock(spec=ScanItem)
|
||||
mock_scan_item.live_data = {
|
||||
device_name: {
|
||||
entry: mock.MagicMock(val=msg["data"][device_name][entry]["value"])
|
||||
for entry in msg["data"][device_name]
|
||||
}
|
||||
for device_name in msg["data"]
|
||||
}
|
||||
|
||||
w1.queue.scan_storage.find_scan_by_ID.return_value = mock_scan_item
|
||||
|
||||
w1.on_scan_segment(msg, msg_metadata)
|
||||
qtbot.wait(500)
|
||||
|
||||
data = c1.get_data()
|
||||
expected_x_y_data = ([1, 2, 3], [1, 2, 3])
|
||||
expected_z_colors = w1._make_z_gradient([1, 3, 2], "magma")
|
||||
|
||||
scatter_points = c1.scatter.points()
|
||||
colors = [point.brush().color() for point in scatter_points]
|
||||
|
||||
assert np.array_equal(data, expected_x_y_data)
|
||||
assert colors == expected_z_colors
|
||||
|
||||
|
||||
def test_waveform_single_arg_inputs(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot()
|
||||
|
||||
w1.plot("bpm4i")
|
||||
w1.plot([1, 2, 3], label="just_y")
|
||||
w1.plot([3, 4, 5], [7, 8, 9], label="x_y")
|
||||
w1.plot(x=[1, 2, 3], y=[4, 5, 6], label="x_y_kwargs")
|
||||
data_array_1D = np.random.rand(10)
|
||||
data_array_2D = np.random.rand(10, 2)
|
||||
w1.plot(data_array_1D, label="np_ndarray 1D")
|
||||
w1.plot(data_array_2D, label="np_ndarray 2D")
|
||||
|
||||
qtbot.wait(200)
|
||||
|
||||
assert w1._curves_data["scan_segment"]["bpm4i-bpm4i"].config.label == "bpm4i-bpm4i"
|
||||
assert w1._curves_data["custom"]["just_y"].config.label == "just_y"
|
||||
assert w1._curves_data["custom"]["x_y"].config.label == "x_y"
|
||||
assert w1._curves_data["custom"]["x_y_kwargs"].config.label == "x_y_kwargs"
|
||||
|
||||
assert np.array_equal(w1._curves_data["custom"]["just_y"].get_data(), ([0, 1, 2], [1, 2, 3]))
|
||||
assert np.array_equal(w1._curves_data["custom"]["just_y"].get_data(), ([0, 1, 2], [1, 2, 3]))
|
||||
assert np.array_equal(w1._curves_data["custom"]["x_y"].get_data(), ([3, 4, 5], [7, 8, 9]))
|
||||
assert np.array_equal(
|
||||
w1._curves_data["custom"]["x_y_kwargs"].get_data(), ([1, 2, 3], [4, 5, 6])
|
||||
)
|
||||
assert np.array_equal(
|
||||
w1._curves_data["custom"]["np_ndarray 1D"].get_data(),
|
||||
(np.arange(data_array_1D.size), data_array_1D.T),
|
||||
)
|
||||
assert np.array_equal(w1._curves_data["custom"]["np_ndarray 2D"].get_data(), data_array_2D.T)
|
||||
|
||||
|
||||
def test_waveform_set_x_sync(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot()
|
||||
custom_label = "custom_label"
|
||||
w1.plot("bpm4i")
|
||||
w1.set_x_label(custom_label)
|
||||
|
||||
scan_item_mock = mock.MagicMock(spec=ScanItem)
|
||||
mock_data = {
|
||||
"samx": {"samx": mock.MagicMock(val=np.array([1, 2, 3]))},
|
||||
"samy": {"samy": mock.MagicMock(val=np.array([4, 5, 6]))},
|
||||
"bpm4i": {
|
||||
"bpm4i": mock.MagicMock(
|
||||
val=np.array([7, 8, 9]),
|
||||
timestamps=np.array([1720520189.959115, 1720520189.986618, 1720520190.0157812]),
|
||||
)
|
||||
},
|
||||
}
|
||||
|
||||
scan_item_mock.live_data = mock_data
|
||||
scan_item_mock.status_message = mock.MagicMock()
|
||||
scan_item_mock.status_message.info = {"scan_report_devices": ["samx"]}
|
||||
|
||||
w1.queue.scan_storage.find_scan_by_ID.return_value = scan_item_mock
|
||||
|
||||
w1.on_scan_segment({"scan_id": 1}, {})
|
||||
qtbot.wait(200)
|
||||
|
||||
# Best effort - samx
|
||||
x_data, y_data = w1.curves[0].get_data()
|
||||
assert np.array_equal(x_data, [1, 2, 3])
|
||||
assert np.array_equal(y_data, [7, 8, 9])
|
||||
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [auto: samx-samx]"
|
||||
|
||||
# Change to samy
|
||||
w1.set_x("samy")
|
||||
qtbot.wait(200)
|
||||
x_data, y_data = w1.curves[0].get_data()
|
||||
assert np.array_equal(x_data, [4, 5, 6])
|
||||
assert np.array_equal(y_data, [7, 8, 9])
|
||||
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [samy-samy]"
|
||||
|
||||
# change to index
|
||||
w1.set_x("index")
|
||||
qtbot.wait(200)
|
||||
x_data, y_data = w1.curves[0].get_data()
|
||||
assert np.array_equal(x_data, [0, 1, 2])
|
||||
assert np.array_equal(y_data, [7, 8, 9])
|
||||
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [index]"
|
||||
|
||||
# change to timestamp
|
||||
w1.set_x("timestamp")
|
||||
qtbot.wait(200)
|
||||
x_data, y_data = w1.curves[0].get_data()
|
||||
assert np.allclose(x_data, np.array([1.72052019e09, 1.72052019e09, 1.72052019e09]))
|
||||
assert np.array_equal(y_data, [7, 8, 9])
|
||||
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [timestamp]"
|
||||
|
||||
|
||||
def test_waveform_async_data_update(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot("async_device")
|
||||
custom_label = "custom_label"
|
||||
w1.set_x_label(custom_label)
|
||||
|
||||
# scan_item_mock = mock.MagicMock()
|
||||
# mock_data = {
|
||||
# "async_device": {
|
||||
# "async_device": mock.MagicMock(
|
||||
# val=np.array([7, 8, 9]),
|
||||
# timestamps=np.array([1720520189.959115, 1720520189.986618, 1720520190.0157812]),
|
||||
# )
|
||||
# }
|
||||
# }
|
||||
#
|
||||
# scan_item_mock.async_data = mock_data
|
||||
# w1.queue.scan_storage.find_scan_by_ID.return_value = scan_item_mock
|
||||
|
||||
msg_1 = {"signals": {"async_device": {"value": [7, 8, 9]}}}
|
||||
metadata_1 = {"async_update": {"max_shape": [None], "type": "add"}}
|
||||
w1.on_async_readback(msg_1, metadata_1)
|
||||
|
||||
qtbot.wait(200)
|
||||
x_data, y_data = w1.curves[0].get_data()
|
||||
assert np.array_equal(x_data, [0, 1, 2])
|
||||
assert np.array_equal(y_data, [7, 8, 9])
|
||||
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [best_effort]"
|
||||
|
||||
msg_2 = {"signals": {"async_device": {"value": [10, 11, 12]}}}
|
||||
w1.on_async_readback(msg_2, metadata_1)
|
||||
|
||||
qtbot.wait(200)
|
||||
x_data, y_data = w1.curves[0].get_data()
|
||||
assert np.array_equal(x_data, [0, 1, 2, 3, 4, 5])
|
||||
assert np.array_equal(y_data, [7, 8, 9, 10, 11, 12])
|
||||
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [best_effort]"
|
||||
|
||||
msg_3 = {"signals": {"async_device": {"value": [20, 21, 22]}}}
|
||||
metadata_3 = {"async_update": {"max_shape": [None], "type": "replace"}}
|
||||
w1.on_async_readback(msg_3, metadata_3)
|
||||
|
||||
qtbot.wait(200)
|
||||
x_data, y_data = w1.curves[0].get_data()
|
||||
assert np.array_equal(x_data, [0, 1, 2])
|
||||
assert np.array_equal(y_data, [20, 21, 22])
|
||||
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [best_effort]"
|
||||
|
||||
|
||||
def test_waveform_set_x_async(qtbot, mocked_client):
|
||||
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
|
||||
w1 = bec_figure.plot("async_device")
|
||||
custom_label = "custom_label"
|
||||
w1.set_x_label(custom_label)
|
||||
|
||||
scan_item_mock = mock.MagicMock()
|
||||
mock_data = {
|
||||
"async_device": {
|
||||
"async_device": {
|
||||
"value": np.array([7, 8, 9]),
|
||||
"timestamp": np.array([1720520189.959115, 1720520189.986618, 1720520190.0157812]),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
scan_item_mock.async_data = mock_data
|
||||
w1.queue.scan_storage.find_scan_by_ID.return_value = scan_item_mock
|
||||
|
||||
w1.on_scan_status({"scan_id": 1})
|
||||
w1.replot_async_curve()
|
||||
|
||||
qtbot.wait(200)
|
||||
x_data, y_data = w1.curves[0].get_data()
|
||||
assert np.array_equal(x_data, [0, 1, 2])
|
||||
assert np.array_equal(y_data, [7, 8, 9])
|
||||
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [best_effort]"
|
||||
|
||||
w1.set_x("timestamp")
|
||||
qtbot.wait(200)
|
||||
x_data, y_data = w1.curves[0].get_data()
|
||||
assert np.allclose(x_data, np.array([1.72052019e09, 1.72052019e09, 1.72052019e09]))
|
||||
assert np.array_equal(y_data, [7, 8, 9])
|
||||
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [timestamp]"
|
||||
|
||||
w1.set_x("index")
|
||||
qtbot.wait(200)
|
||||
x_data, y_data = w1.curves[0].get_data()
|
||||
assert np.array_equal(x_data, [0, 1, 2])
|
||||
assert np.array_equal(y_data, [7, 8, 9])
|
||||
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [index]"
|
Reference in New Issue
Block a user