mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 03:31:50 +02:00
feat(waveform): new Waveform widget based on NextGen PlotBase
This commit is contained in:
@ -1,8 +1,11 @@
|
||||
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
|
||||
from math import inf
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import fakeredis
|
||||
import pytest
|
||||
from bec_lib.bec_service import messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.redis_connector import RedisConnector
|
||||
|
||||
from bec_widgets.tests.utils import DEVICES, DMMock, FakePositioner, Positioner
|
||||
@ -50,3 +53,150 @@ def mocked_client(bec_dispatcher):
|
||||
with patch("builtins.isinstance", new=isinstance_mock):
|
||||
yield client
|
||||
connector.shutdown() # TODO change to real BECClient
|
||||
|
||||
|
||||
##################################################
|
||||
# Client Fixture with DAP
|
||||
##################################################
|
||||
@pytest.fixture(scope="function")
|
||||
def dap_plugin_message():
|
||||
msg = messages.AvailableResourceMessage(
|
||||
**{
|
||||
"resource": {
|
||||
"GaussianModel": {
|
||||
"class": "LmfitService1D",
|
||||
"user_friendly_name": "GaussianModel",
|
||||
"class_doc": "A model based on a Gaussian or normal distribution lineshape.\n\n The model has three Parameters: `amplitude`, `center`, and `sigma`.\n In addition, parameters `fwhm` and `height` are included as\n constraints to report full width at half maximum and maximum peak\n height, respectively.\n\n .. math::\n\n f(x; A, \\mu, \\sigma) = \\frac{A}{\\sigma\\sqrt{2\\pi}} e^{[{-{(x-\\mu)^2}/{{2\\sigma}^2}}]}\n\n where the parameter `amplitude` corresponds to :math:`A`, `center` to\n :math:`\\mu`, and `sigma` to :math:`\\sigma`. The full width at half\n maximum is :math:`2\\sigma\\sqrt{2\\ln{2}}`, approximately\n :math:`2.3548\\sigma`.\n\n For more information, see: https://en.wikipedia.org/wiki/Normal_distribution\n\n ",
|
||||
"run_doc": "A model based on a Gaussian or normal distribution lineshape.\n\n The model has three Parameters: `amplitude`, `center`, and `sigma`.\n In addition, parameters `fwhm` and `height` are included as\n constraints to report full width at half maximum and maximum peak\n height, respectively.\n\n .. math::\n\n f(x; A, \\mu, \\sigma) = \\frac{A}{\\sigma\\sqrt{2\\pi}} e^{[{-{(x-\\mu)^2}/{{2\\sigma}^2}}]}\n\n where the parameter `amplitude` corresponds to :math:`A`, `center` to\n :math:`\\mu`, and `sigma` to :math:`\\sigma`. The full width at half\n maximum is :math:`2\\sigma\\sqrt{2\\ln{2}}`, approximately\n :math:`2.3548\\sigma`.\n\n For more information, see: https://en.wikipedia.org/wiki/Normal_distribution\n\n \n Args:\n scan_item (ScanItem): Scan item or scan ID\n device_x (DeviceBase | str): Device name for x\n signal_x (DeviceBase | str): Signal name for x\n device_y (DeviceBase | str): Device name for y\n signal_y (DeviceBase | str): Signal name for y\n parameters (dict): Fit parameters\n ",
|
||||
"run_name": "fit",
|
||||
"signature": [
|
||||
{
|
||||
"name": "args",
|
||||
"kind": "VAR_POSITIONAL",
|
||||
"default": "_empty",
|
||||
"annotation": "_empty",
|
||||
},
|
||||
{
|
||||
"name": "scan_item",
|
||||
"kind": "KEYWORD_ONLY",
|
||||
"default": None,
|
||||
"annotation": "ScanItem | str",
|
||||
},
|
||||
{
|
||||
"name": "device_x",
|
||||
"kind": "KEYWORD_ONLY",
|
||||
"default": None,
|
||||
"annotation": "DeviceBase | str",
|
||||
},
|
||||
{
|
||||
"name": "signal_x",
|
||||
"kind": "KEYWORD_ONLY",
|
||||
"default": None,
|
||||
"annotation": "DeviceBase | str",
|
||||
},
|
||||
{
|
||||
"name": "device_y",
|
||||
"kind": "KEYWORD_ONLY",
|
||||
"default": None,
|
||||
"annotation": "DeviceBase | str",
|
||||
},
|
||||
{
|
||||
"name": "signal_y",
|
||||
"kind": "KEYWORD_ONLY",
|
||||
"default": None,
|
||||
"annotation": "DeviceBase | str",
|
||||
},
|
||||
{
|
||||
"name": "parameters",
|
||||
"kind": "KEYWORD_ONLY",
|
||||
"default": None,
|
||||
"annotation": "dict",
|
||||
},
|
||||
{
|
||||
"name": "kwargs",
|
||||
"kind": "VAR_KEYWORD",
|
||||
"default": "_empty",
|
||||
"annotation": "_empty",
|
||||
},
|
||||
],
|
||||
"auto_fit_supported": True,
|
||||
"params": {
|
||||
"amplitude": {
|
||||
"name": "amplitude",
|
||||
"value": 1.0,
|
||||
"vary": True,
|
||||
"min": -inf,
|
||||
"max": inf,
|
||||
"expr": None,
|
||||
"brute_step": None,
|
||||
"user_data": None,
|
||||
},
|
||||
"center": {
|
||||
"name": "center",
|
||||
"value": 0.0,
|
||||
"vary": True,
|
||||
"min": -inf,
|
||||
"max": inf,
|
||||
"expr": None,
|
||||
"brute_step": None,
|
||||
"user_data": None,
|
||||
},
|
||||
"sigma": {
|
||||
"name": "sigma",
|
||||
"value": 1.0,
|
||||
"vary": True,
|
||||
"min": 0,
|
||||
"max": inf,
|
||||
"expr": None,
|
||||
"brute_step": None,
|
||||
"user_data": None,
|
||||
},
|
||||
"fwhm": {
|
||||
"name": "fwhm",
|
||||
"value": 2.35482,
|
||||
"vary": False,
|
||||
"min": -inf,
|
||||
"max": inf,
|
||||
"expr": "2.3548200*sigma",
|
||||
"brute_step": None,
|
||||
"user_data": None,
|
||||
},
|
||||
"height": {
|
||||
"name": "height",
|
||||
"value": 0.3989423,
|
||||
"vary": False,
|
||||
"min": -inf,
|
||||
"max": inf,
|
||||
"expr": "0.3989423*amplitude/max(1e-15, sigma)",
|
||||
"brute_step": None,
|
||||
"user_data": None,
|
||||
},
|
||||
},
|
||||
"class_args": [],
|
||||
"class_kwargs": {"model": "GaussianModel"},
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
yield msg
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def mocked_client_with_dap(mocked_client, dap_plugin_message):
|
||||
dap_services = {
|
||||
"BECClient": messages.StatusMessage(name="BECClient", status=1, info={}),
|
||||
"DAPServer/LmfitService1D": messages.StatusMessage(
|
||||
name="LmfitService1D", status=1, info={}
|
||||
),
|
||||
}
|
||||
client = mocked_client
|
||||
client.service_status = dap_services
|
||||
client.connector.set(
|
||||
topic=MessageEndpoints.dap_available_plugins("dap"), msg=dap_plugin_message
|
||||
)
|
||||
|
||||
# Patch the client's DAP attribute so that the available models include "GaussianModel"
|
||||
patched_models = {"GaussianModel": {}, "LorentzModel": {}, "SineModel": {}}
|
||||
client.dap._available_dap_plugins = patched_models
|
||||
|
||||
yield client
|
||||
|
367
tests/unit_tests/test_curve_settings.py
Normal file
367
tests/unit_tests/test_curve_settings.py
Normal file
@ -0,0 +1,367 @@
|
||||
import json
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from qtpy.QtWidgets import QComboBox, QVBoxLayout
|
||||
|
||||
from bec_widgets.widgets.plots_next_gen.waveform.settings.curve_settings.curve_setting import (
|
||||
CurveSetting,
|
||||
)
|
||||
from bec_widgets.widgets.plots_next_gen.waveform.settings.curve_settings.curve_tree import CurveTree
|
||||
from bec_widgets.widgets.plots_next_gen.waveform.waveform import Waveform
|
||||
from tests.unit_tests.client_mocks import dap_plugin_message, mocked_client, mocked_client_with_dap
|
||||
from tests.unit_tests.conftest import create_widget
|
||||
|
||||
##################################################
|
||||
# CurveSetting
|
||||
##################################################
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def curve_setting_fixture(qtbot, mocked_client):
|
||||
"""
|
||||
Creates a CurveSetting widget targeting a mock or real Waveform widget.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
wf.x_mode = "auto"
|
||||
curve_setting = create_widget(qtbot, CurveSetting, parent=None, target_widget=wf)
|
||||
return curve_setting, wf
|
||||
|
||||
|
||||
def test_curve_setting_init(curve_setting_fixture):
|
||||
"""
|
||||
Ensure CurveSetting constructs properly, with a CurveTree inside
|
||||
and an x-axis group box for modes.
|
||||
"""
|
||||
curve_setting, wf = curve_setting_fixture
|
||||
|
||||
# Basic checks
|
||||
assert curve_setting.objectName() == "CurveSetting"
|
||||
# The layout should be QVBoxLayout
|
||||
assert isinstance(curve_setting.layout, QVBoxLayout)
|
||||
|
||||
# There's an x_axis_box group and a y_axis_box group
|
||||
assert hasattr(curve_setting, "x_axis_box")
|
||||
assert hasattr(curve_setting, "y_axis_box")
|
||||
|
||||
# The x_axis_box should contain a QComboBox for mode
|
||||
mode_combo = curve_setting.mode_combo
|
||||
assert isinstance(mode_combo, QComboBox)
|
||||
# Should contain these items: ["auto", "index", "timestamp", "device"]
|
||||
expected_modes = ["auto", "index", "timestamp", "device"]
|
||||
for m in expected_modes:
|
||||
assert m in [
|
||||
curve_setting.mode_combo.itemText(i) for i in range(curve_setting.mode_combo.count())
|
||||
]
|
||||
|
||||
# Check that there's a curve_manager inside y_axis_box
|
||||
assert hasattr(curve_setting, "curve_manager")
|
||||
assert curve_setting.y_axis_box.layout.count() > 0
|
||||
|
||||
|
||||
def test_curve_setting_accept_changes(curve_setting_fixture, qtbot):
|
||||
"""
|
||||
Test that calling accept_changes() applies x-axis mode changes
|
||||
and triggers the CurveTree to send its curve JSON to the target waveform.
|
||||
"""
|
||||
curve_setting, wf = curve_setting_fixture
|
||||
|
||||
# Suppose user chooses "index" from the combo
|
||||
curve_setting.mode_combo.setCurrentText("index")
|
||||
# The device_x is disabled if not device mode
|
||||
|
||||
# Spy on 'send_curve_json' from the curve_manager
|
||||
send_spy = MagicMock()
|
||||
curve_setting.curve_manager.send_curve_json = send_spy
|
||||
|
||||
# Call accept_changes()
|
||||
curve_setting.accept_changes()
|
||||
|
||||
# Check that we updated the waveform
|
||||
assert wf.x_mode == "index"
|
||||
# Check that the manager send_curve_json was called
|
||||
send_spy.assert_called_once()
|
||||
|
||||
|
||||
def test_curve_setting_switch_device_mode(curve_setting_fixture, qtbot):
|
||||
"""
|
||||
If user chooses device mode from the combo, the device_x line edit should be enabled
|
||||
and set to the current wavefrom.x_axis_mode["name"].
|
||||
"""
|
||||
curve_setting, wf = curve_setting_fixture
|
||||
|
||||
# Initially we assume "auto"
|
||||
assert curve_setting.mode_combo.currentText() == "auto"
|
||||
# Switch to device
|
||||
curve_setting.mode_combo.setCurrentText("device")
|
||||
assert curve_setting.device_x.isEnabled()
|
||||
|
||||
# This line edit should reflect the waveform.x_axis_mode["name"], or be blank if none
|
||||
assert curve_setting.device_x.text() == wf.x_axis_mode["name"]
|
||||
|
||||
|
||||
def test_curve_setting_refresh(curve_setting_fixture, qtbot):
|
||||
"""
|
||||
Test that calling refresh() refreshes the embedded CurveTree
|
||||
and re-reads the x axis mode from the waveform.
|
||||
"""
|
||||
curve_setting, wf = curve_setting_fixture
|
||||
|
||||
# Suppose the waveform changed x_mode from "auto" to "timestamp" behind the scenes
|
||||
wf.x_mode = "timestamp"
|
||||
# Spy on the curve_manager
|
||||
refresh_spy = MagicMock()
|
||||
curve_setting.curve_manager.refresh_from_waveform = refresh_spy
|
||||
|
||||
# Call refresh
|
||||
curve_setting.refresh()
|
||||
|
||||
refresh_spy.assert_called_once()
|
||||
# The combo should now read "timestamp"
|
||||
assert curve_setting.mode_combo.currentText() == "timestamp"
|
||||
|
||||
|
||||
##################################################
|
||||
# CurveTree
|
||||
##################################################
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def curve_tree_fixture(qtbot, mocked_client_with_dap):
|
||||
"""
|
||||
Creates a CurveTree widget referencing a mocked or real Waveform.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
|
||||
wf.color_palette = "magma"
|
||||
curve_tree = create_widget(qtbot, CurveTree, parent=None, waveform=wf)
|
||||
return curve_tree, wf
|
||||
|
||||
|
||||
def test_curve_tree_init(curve_tree_fixture):
|
||||
"""
|
||||
Test that the CurveTree initializes properly with references to the waveform,
|
||||
sets up the toolbar, and an empty QTreeWidget.
|
||||
"""
|
||||
curve_tree, wf = curve_tree_fixture
|
||||
assert curve_tree.waveform == wf
|
||||
assert curve_tree.color_palette == "magma"
|
||||
assert curve_tree.tree.columnCount() == 7
|
||||
|
||||
assert "add" in curve_tree.toolbar.widgets
|
||||
assert "expand_all" in curve_tree.toolbar.widgets
|
||||
assert "collapse_all" in curve_tree.toolbar.widgets
|
||||
assert "renormalize_colors" in curve_tree.toolbar.widgets
|
||||
|
||||
|
||||
def test_add_new_curve(curve_tree_fixture):
|
||||
"""
|
||||
Test that add_new_curve() adds a top-level item with a device curve config,
|
||||
assigns it a color from the buffer, and doesn't modify existing rows.
|
||||
"""
|
||||
curve_tree, wf = curve_tree_fixture
|
||||
curve_tree.color_buffer = ["#111111", "#222222", "#333333", "#444444", "#555555"]
|
||||
|
||||
assert curve_tree.tree.topLevelItemCount() == 0
|
||||
|
||||
with patch.object(curve_tree, "_ensure_color_buffer_size") as ensure_spy:
|
||||
new_item = curve_tree.add_new_curve(name="bpm4i", entry="bpm4i")
|
||||
ensure_spy.assert_called_once()
|
||||
|
||||
assert curve_tree.tree.topLevelItemCount() == 1
|
||||
last_item = curve_tree.all_items[-1]
|
||||
assert last_item is new_item
|
||||
assert new_item.config.source == "device"
|
||||
assert new_item.config.signal.name == "bpm4i"
|
||||
assert new_item.config.signal.entry == "bpm4i"
|
||||
assert new_item.config.color in curve_tree.color_buffer
|
||||
|
||||
|
||||
def test_renormalize_colors(curve_tree_fixture):
|
||||
"""
|
||||
Test that renormalize_colors overwrites colors for all items in creation order.
|
||||
"""
|
||||
curve_tree, wf = curve_tree_fixture
|
||||
# Add multiple curves
|
||||
c1 = curve_tree.add_new_curve(name="bpm4i", entry="bpm4i")
|
||||
c2 = curve_tree.add_new_curve(name="bpm3a", entry="bpm3a")
|
||||
curve_tree.color_buffer = []
|
||||
|
||||
set_color_spy_c1 = patch.object(c1.color_button, "set_color")
|
||||
set_color_spy_c2 = patch.object(c2.color_button, "set_color")
|
||||
|
||||
with set_color_spy_c1 as spy1, set_color_spy_c2 as spy2:
|
||||
curve_tree.renormalize_colors()
|
||||
spy1.assert_called_once()
|
||||
spy2.assert_called_once()
|
||||
|
||||
|
||||
def test_expand_collapse(curve_tree_fixture):
|
||||
"""
|
||||
Test expand_all_daps() and collapse_all_daps() calls expand/collapse on every top-level item.
|
||||
"""
|
||||
curve_tree, wf = curve_tree_fixture
|
||||
c1 = curve_tree.add_new_curve(name="bpm4i", entry="bpm4i")
|
||||
curve_tree.tree.expandAll()
|
||||
expand_spy = patch.object(curve_tree.tree, "expandItem")
|
||||
collapse_spy = patch.object(curve_tree.tree, "collapseItem")
|
||||
|
||||
with expand_spy as e_spy:
|
||||
curve_tree.expand_all_daps()
|
||||
e_spy.assert_called_once_with(c1)
|
||||
|
||||
with collapse_spy as c_spy:
|
||||
curve_tree.collapse_all_daps()
|
||||
c_spy.assert_called_once_with(c1)
|
||||
|
||||
|
||||
def test_send_curve_json(curve_tree_fixture, monkeypatch):
|
||||
"""
|
||||
Test that send_curve_json sets the waveform's color_palette and curve_json
|
||||
to the exported config from the tree.
|
||||
"""
|
||||
curve_tree, wf = curve_tree_fixture
|
||||
# Add multiple curves
|
||||
curve_tree.add_new_curve(name="bpm4i", entry="bpm4i")
|
||||
curve_tree.add_new_curve(name="bpm3a", entry="bpm3a")
|
||||
|
||||
curve_tree.color_palette = "viridis"
|
||||
curve_tree.send_curve_json()
|
||||
|
||||
assert wf.color_palette == "viridis"
|
||||
data = json.loads(wf.curve_json)
|
||||
assert len(data) == 2
|
||||
labels = [d["label"] for d in data]
|
||||
assert "bpm4i-bpm4i" in labels
|
||||
assert "bpm3a-bpm3a" in labels
|
||||
|
||||
|
||||
def test_refresh_from_waveform(qtbot, mocked_client_with_dap, monkeypatch):
|
||||
"""
|
||||
Test that refresh_from_waveform() rebuilds the tree from the waveform's curve_json
|
||||
"""
|
||||
patched_models = {"GaussianModel": {}, "LorentzModel": {}, "SineModel": {}}
|
||||
monkeypatch.setattr(mocked_client_with_dap.dap, "_available_dap_plugins", patched_models)
|
||||
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
|
||||
wf.x_mode = "auto"
|
||||
curve_tree = create_widget(qtbot, CurveTree, parent=None, waveform=wf)
|
||||
|
||||
wf.plot(arg1="bpm4i", dap="GaussianModel")
|
||||
wf.plot(arg1="bpm3a", dap="GaussianModel")
|
||||
|
||||
# Clear the tree to simulate a fresh rebuild.
|
||||
curve_tree.tree.clear()
|
||||
curve_tree.all_items.clear()
|
||||
assert curve_tree.tree.topLevelItemCount() == 0
|
||||
|
||||
# For DAP rows
|
||||
curve_tree.refresh_from_waveform()
|
||||
assert curve_tree.tree.topLevelItemCount() == 2
|
||||
|
||||
|
||||
def test_add_dap_row(curve_tree_fixture):
|
||||
"""
|
||||
Test that add_dap_row creates a new DAP curve as a child of a device curve,
|
||||
with the correct configuration and parent-child relationship.
|
||||
"""
|
||||
curve_tree, wf = curve_tree_fixture
|
||||
|
||||
# Add a device curve first
|
||||
device_row = curve_tree.add_new_curve(name="bpm4i", entry="bpm4i")
|
||||
assert device_row.source == "device"
|
||||
assert curve_tree.tree.topLevelItemCount() == 1
|
||||
assert device_row.childCount() == 0
|
||||
|
||||
# Now add a DAP row to it
|
||||
device_row.add_dap_row()
|
||||
|
||||
# Check that child was added
|
||||
assert device_row.childCount() == 1
|
||||
dap_child = device_row.child(0)
|
||||
|
||||
# Verify the DAP child has the correct configuration
|
||||
assert dap_child.source == "dap"
|
||||
assert dap_child.config.parent_label == device_row.config.label
|
||||
|
||||
# Check that the DAP inherits device name/entry from parent
|
||||
assert dap_child.config.signal.name == "bpm4i"
|
||||
assert dap_child.config.signal.entry == "bpm4i"
|
||||
|
||||
# Check that the item is in the curve_tree's all_items list
|
||||
assert dap_child in curve_tree.all_items
|
||||
|
||||
|
||||
def test_remove_self_top_level(curve_tree_fixture):
|
||||
"""
|
||||
Test that remove_self removes a top-level device row from the tree.
|
||||
"""
|
||||
curve_tree, wf = curve_tree_fixture
|
||||
|
||||
# Add two device curves
|
||||
row1 = curve_tree.add_new_curve(name="bpm4i", entry="bpm4i")
|
||||
row2 = curve_tree.add_new_curve(name="bpm3a", entry="bpm3a")
|
||||
assert curve_tree.tree.topLevelItemCount() == 2
|
||||
assert len(curve_tree.all_items) == 2
|
||||
|
||||
# Remove the first row
|
||||
row1.remove_self()
|
||||
|
||||
# Check that only one row remains and it's the correct one
|
||||
assert curve_tree.tree.topLevelItemCount() == 1
|
||||
assert curve_tree.tree.topLevelItem(0) == row2
|
||||
assert len(curve_tree.all_items) == 1
|
||||
assert curve_tree.all_items[0] == row2
|
||||
|
||||
|
||||
def test_remove_self_child(curve_tree_fixture):
|
||||
"""
|
||||
Test that remove_self removes a child DAP row while preserving the parent device row.
|
||||
"""
|
||||
curve_tree, wf = curve_tree_fixture
|
||||
|
||||
# Add a device curve and a DAP child
|
||||
device_row = curve_tree.add_new_curve(name="bpm4i", entry="bpm4i")
|
||||
device_row.add_dap_row()
|
||||
dap_child = device_row.child(0)
|
||||
|
||||
assert curve_tree.tree.topLevelItemCount() == 1
|
||||
assert device_row.childCount() == 1
|
||||
assert len(curve_tree.all_items) == 2
|
||||
|
||||
# Remove the DAP child
|
||||
dap_child.remove_self()
|
||||
|
||||
# Check that the parent device row still exists but has no children
|
||||
assert curve_tree.tree.topLevelItemCount() == 1
|
||||
assert device_row.childCount() == 0
|
||||
assert len(curve_tree.all_items) == 1
|
||||
assert curve_tree.all_items[0] == device_row
|
||||
|
||||
|
||||
def test_export_data_dap(curve_tree_fixture):
|
||||
"""
|
||||
Test that export_data from a DAP row correctly includes parent relationship and DAP model.
|
||||
"""
|
||||
curve_tree, wf = curve_tree_fixture
|
||||
|
||||
# Add a device curve with specific parameters
|
||||
device_row = curve_tree.add_new_curve(name="bpm4i", entry="bpm4i")
|
||||
device_row.config.label = "bpm4i-main"
|
||||
|
||||
# Add a DAP child
|
||||
device_row.add_dap_row()
|
||||
dap_child = device_row.child(0)
|
||||
|
||||
# Set a specific model in the DAP combobox
|
||||
dap_child.dap_combo.fit_model_combobox.setCurrentText("GaussianModel")
|
||||
|
||||
# Export data from the DAP row
|
||||
exported = dap_child.export_data()
|
||||
|
||||
# Check the exported data
|
||||
assert exported["source"] == "dap"
|
||||
assert exported["parent_label"] == "bpm4i-main"
|
||||
assert exported["signal"]["name"] == "bpm4i"
|
||||
assert exported["signal"]["entry"] == "bpm4i"
|
||||
assert exported["signal"]["dap"] == "GaussianModel"
|
||||
assert exported["label"] == "bpm4i-main-GaussianModel"
|
787
tests/unit_tests/test_waveform_next_gen.py
Normal file
787
tests/unit_tests/test_waveform_next_gen.py
Normal file
@ -0,0 +1,787 @@
|
||||
import json
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import numpy as np
|
||||
import pyqtgraph as pg
|
||||
import pytest
|
||||
from pyqtgraph.graphicsItems.DateAxisItem import DateAxisItem
|
||||
|
||||
from bec_widgets.widgets.plots_next_gen.plot_base import UIMode
|
||||
from bec_widgets.widgets.plots_next_gen.waveform.curve import DeviceSignal
|
||||
from bec_widgets.widgets.plots_next_gen.waveform.waveform import Waveform
|
||||
from tests.unit_tests.client_mocks import dap_plugin_message, mocked_client, mocked_client_with_dap
|
||||
|
||||
from .conftest import create_widget
|
||||
|
||||
##################################################
|
||||
# Waveform widget base functionality tests
|
||||
##################################################
|
||||
|
||||
|
||||
def test_waveform_initialization(qtbot, mocked_client):
|
||||
"""
|
||||
Test that a new Waveform widget initializes with the correct defaults.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
assert wf.objectName() == "Waveform"
|
||||
# Inherited from PlotBase
|
||||
assert wf.title == ""
|
||||
assert wf.x_label == ""
|
||||
assert wf.y_label == ""
|
||||
# No crosshair or FPS monitor by default
|
||||
assert wf.crosshair is None
|
||||
assert wf.fps_monitor is None
|
||||
# No curves initially
|
||||
assert len(wf.plot_item.curves) == 0
|
||||
|
||||
|
||||
def test_waveform_with_side_menu(qtbot, mocked_client):
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client, popups=False)
|
||||
|
||||
assert wf.ui_mode == UIMode.SIDE
|
||||
|
||||
|
||||
def test_plot_custom_curve(qtbot, mocked_client):
|
||||
"""
|
||||
Test that calling plot with explicit x and y data creates a custom curve.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
curve = wf.plot(x=[1, 2, 3], y=[4, 5, 6], label="custom_curve")
|
||||
assert curve is not None
|
||||
assert curve.config.source == "custom"
|
||||
assert curve.config.label == "custom_curve"
|
||||
x_data, y_data = curve.get_data()
|
||||
np.testing.assert_array_equal(x_data, np.array([1, 2, 3]))
|
||||
np.testing.assert_array_equal(y_data, np.array([4, 5, 6]))
|
||||
|
||||
|
||||
def test_plot_single_arg_input_1d(qtbot, mocked_client):
|
||||
"""
|
||||
Test that when a single 1D numpy array is passed, the curve is created with
|
||||
x-data as a generated index.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
data = np.array([10, 20, 30])
|
||||
curve = wf.plot(data, label="curve_1d")
|
||||
x_data, y_data = curve.get_data()
|
||||
np.testing.assert_array_equal(x_data, np.arange(len(data)))
|
||||
np.testing.assert_array_equal(y_data, data)
|
||||
|
||||
|
||||
def test_plot_single_arg_input_2d(qtbot, mocked_client):
|
||||
"""
|
||||
Test that when a single 2D numpy array (N x 2) is passed,
|
||||
x and y data are extracted from the first and second columns.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
data = np.array([[1, 4], [2, 5], [3, 6]])
|
||||
curve = wf.plot(data, label="curve_2d")
|
||||
x_data, y_data = curve.get_data()
|
||||
np.testing.assert_array_equal(x_data, data[:, 0])
|
||||
np.testing.assert_array_equal(y_data, data[:, 1])
|
||||
|
||||
|
||||
def test_plot_single_arg_input_sync(qtbot, mocked_client):
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
|
||||
c1 = wf.plot(arg1="bpm4i")
|
||||
c2 = wf.plot(arg1="bpm3a")
|
||||
|
||||
assert c1.config.source == "device"
|
||||
assert c2.config.source == "device"
|
||||
assert c1.config.signal == DeviceSignal(name="bpm4i", entry="bpm4i", dap=None)
|
||||
assert c2.config.signal == DeviceSignal(name="bpm3a", entry="bpm3a", dap=None)
|
||||
|
||||
# Check that the curve is added to the plot
|
||||
assert len(wf.plot_item.curves) == 2
|
||||
|
||||
|
||||
def test_plot_single_arg_input_async(qtbot, mocked_client):
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
|
||||
c1 = wf.plot(arg1="eiger")
|
||||
c2 = wf.plot(arg1="async_device")
|
||||
|
||||
assert c1.config.source == "device"
|
||||
assert c2.config.source == "device"
|
||||
assert c1.config.signal == DeviceSignal(name="eiger", entry="eiger", dap=None)
|
||||
assert c2.config.signal == DeviceSignal(name="async_device", entry="async_device", dap=None)
|
||||
|
||||
# Check that the curve is added to the plot
|
||||
assert len(wf.plot_item.curves) == 2
|
||||
|
||||
|
||||
def test_curve_access_pattern(qtbot, mocked_client):
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
|
||||
c1 = wf.plot(arg1="bpm4i")
|
||||
c2 = wf.plot(arg1="bpm3a")
|
||||
|
||||
# Check that the curve is added to the plot
|
||||
assert len(wf.plot_item.curves) == 2
|
||||
|
||||
# Check that the curve is accessible by label
|
||||
assert wf.get_curve("bpm4i-bpm4i") == c1
|
||||
assert wf.get_curve("bpm3a-bpm3a") == c2
|
||||
|
||||
# Check that the curve is accessible by index
|
||||
assert wf.get_curve(0) == c1
|
||||
assert wf.get_curve(1) == c2
|
||||
|
||||
# Check that the curve is accessible by label
|
||||
assert wf["bpm4i-bpm4i"] == c1
|
||||
assert wf["bpm3a-bpm3a"] == c2
|
||||
assert wf[0] == c1
|
||||
assert wf[1] == c2
|
||||
|
||||
assert wf.curves[0] == c1
|
||||
assert wf.curves[1] == c2
|
||||
|
||||
|
||||
def test_find_curve_by_label(qtbot, mocked_client):
|
||||
"""
|
||||
Test the _find_curve_by_label method returns the correct curve or None if not found.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
c1 = wf.plot(arg1="bpm4i", label="c1_label")
|
||||
c2 = wf.plot(arg1="bpm3a", label="c2_label")
|
||||
|
||||
found = wf._find_curve_by_label("c1_label")
|
||||
assert found == c1, "Should return the first curve"
|
||||
missing = wf._find_curve_by_label("bogus_label")
|
||||
assert missing is None, "Should return None if not found"
|
||||
|
||||
|
||||
def test_set_x_mode(qtbot, mocked_client):
|
||||
"""
|
||||
Test that setting x_mode updates the internal x-axis mode state and switches
|
||||
the bottom axis of the plot.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
wf.x_mode = "timestamp"
|
||||
assert wf.x_axis_mode["name"] == "timestamp"
|
||||
# When x_mode is 'timestamp', the bottom axis should be a DateAxisItem.
|
||||
assert isinstance(wf.plot_item.axes["bottom"]["item"], DateAxisItem)
|
||||
|
||||
wf.x_mode = "index"
|
||||
# For other modes, the bottom axis becomes the default AxisItem.
|
||||
assert isinstance(wf.plot_item.axes["bottom"]["item"], pg.AxisItem)
|
||||
|
||||
wf.x_mode = "samx"
|
||||
assert wf.x_axis_mode["name"] == "samx"
|
||||
assert isinstance(wf.plot_item.axes["bottom"]["item"], pg.AxisItem)
|
||||
|
||||
|
||||
def test_color_palette_update(qtbot, mocked_client):
|
||||
"""
|
||||
Test that updating the color_palette property changes the color of existing curves.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
curve = wf.plot(x=[1, 2, 3], y=[4, 5, 6], label="test_curve")
|
||||
original_color = curve.config.color
|
||||
# Change to a different valid palette
|
||||
wf.color_palette = "plasma"
|
||||
assert wf.config.color_palette == "plasma"
|
||||
# After updating the palette, the curve's color should be re-generated.
|
||||
assert curve.config.color != original_color
|
||||
|
||||
|
||||
def test_curve_json_property(qtbot, mocked_client):
|
||||
"""
|
||||
Test that the curve_json property returns a JSON string representing
|
||||
non-custom curves. Since custom curves are not serialized, if only a custom
|
||||
curve is added, an empty list should be returned.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
wf.plot(x=[1, 2, 3], y=[4, 5, 6], label="custom_curve")
|
||||
json_str = wf.curve_json
|
||||
data = json.loads(json_str)
|
||||
assert isinstance(data, list)
|
||||
# Only custom curves exist so none should be serialized.
|
||||
assert len(data) == 0
|
||||
|
||||
|
||||
def test_remove_curve_waveform(qtbot, mocked_client):
|
||||
"""
|
||||
Test that curves can be removed from the waveform using either their label or index.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
wf.plot(x=[1, 2, 3], y=[4, 5, 6], label="curve1")
|
||||
wf.plot(x=[4, 5, 6], y=[7, 8, 9], label="curve2")
|
||||
num_before = len(wf.plot_item.curves)
|
||||
wf.remove_curve("curve1")
|
||||
num_after = len(wf.plot_item.curves)
|
||||
assert num_after == num_before - 1
|
||||
|
||||
wf.remove_curve(0)
|
||||
assert len(wf.plot_item.curves) == num_after - 1
|
||||
|
||||
|
||||
def test_get_all_data_empty(qtbot, mocked_client):
|
||||
"""
|
||||
Test that get_all_data returns an empty dictionary when no curves have been added.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
all_data = wf.get_all_data(output="dict")
|
||||
assert all_data == {}
|
||||
|
||||
|
||||
def test_get_all_data_dict(qtbot, mocked_client):
|
||||
"""
|
||||
Test that get_all_data returns a dictionary with the expected x and y data for each curve.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
wf.plot(x=[1, 2, 3], y=[4, 5, 6], label="curve1")
|
||||
wf.plot(x=[7, 8, 9], y=[10, 11, 12], label="curve2")
|
||||
|
||||
all_data = wf.get_all_data(output="dict")
|
||||
|
||||
expected = {
|
||||
"curve1": {"x": [1, 2, 3], "y": [4, 5, 6]},
|
||||
"curve2": {"x": [7, 8, 9], "y": [10, 11, 12]},
|
||||
}
|
||||
assert all_data == expected
|
||||
|
||||
|
||||
def test_curve_json_getter_setter(qtbot, mocked_client):
|
||||
"""
|
||||
Test that the curve_json getter returns a JSON string representing device curves
|
||||
and that setting curve_json re-creates the curves.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
|
||||
# These curves should be in JSON
|
||||
wf.plot(arg1="bpm4i")
|
||||
wf.plot(arg1="bpm3a")
|
||||
# Custom curves should be ignored
|
||||
wf.plot(x=[1, 2, 3], y=[4, 5, 6], label="custom_curve")
|
||||
wf.plot([1, 2, 3, 4])
|
||||
|
||||
# Get JSON from the getter.
|
||||
json_str = wf.curve_json
|
||||
curve_configs = json.loads(json_str)
|
||||
# Only device curves are serialized; expect two configurations.
|
||||
assert isinstance(curve_configs, list)
|
||||
assert len(curve_configs) == 2
|
||||
labels = [cfg["label"] for cfg in curve_configs]
|
||||
assert "bpm4i-bpm4i" in labels
|
||||
assert "bpm3a-bpm3a" in labels
|
||||
|
||||
# Clear all curves.
|
||||
wf.clear_all()
|
||||
assert len(wf.plot_item.curves) == 0
|
||||
|
||||
# Use the JSON setter to re-create the curves.
|
||||
wf.curve_json = json_str
|
||||
# After setting, the waveform should have two curves.
|
||||
assert len(wf.plot_item.curves) == 2
|
||||
new_labels = [curve.name() for curve in wf.plot_item.curves]
|
||||
for lab in labels:
|
||||
assert lab in new_labels
|
||||
|
||||
|
||||
def test_curve_json_setter_ignores_custom(qtbot, mocked_client):
|
||||
"""
|
||||
Test that when curve_json setter is given a JSON string containing a
|
||||
curve with source "custom", that curve is not added.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
|
||||
device_curve_config = {
|
||||
"widget_class": "Curve",
|
||||
"parent_id": wf.gui_id,
|
||||
"label": "device_curve",
|
||||
"color": "#ff0000",
|
||||
"source": "device",
|
||||
"signal": {"name": "bpm4i", "entry": "bpm4i", "dap": None},
|
||||
}
|
||||
custom_curve_config = {
|
||||
"widget_class": "Curve",
|
||||
"parent_id": wf.gui_id,
|
||||
"label": "custom_curve",
|
||||
"color": "#00ff00",
|
||||
"source": "custom",
|
||||
# No signal for custom curves.
|
||||
}
|
||||
json_str = json.dumps([device_curve_config, custom_curve_config], indent=2)
|
||||
wf.curve_json = json_str
|
||||
# Only the device curve should be added.
|
||||
curves = wf.plot_item.curves
|
||||
assert len(curves) == 1
|
||||
assert curves[0].name() == "device_curve"
|
||||
|
||||
|
||||
##################################################
|
||||
# Waveform widget scan logic tests
|
||||
##################################################
|
||||
|
||||
|
||||
class DummyData:
|
||||
def __init__(self, val, timestamps):
|
||||
self.val = val
|
||||
self.timestamps = timestamps
|
||||
|
||||
def get(self, key, default=None):
|
||||
if key == "val":
|
||||
return self.val
|
||||
return default
|
||||
|
||||
|
||||
def create_dummy_scan_item():
|
||||
"""
|
||||
Helper to create a dummy scan item with both live_data and metadata/status_message info.
|
||||
"""
|
||||
dummy_live_data = {
|
||||
"samx": {"samx": DummyData(val=[10, 20, 30], timestamps=[100, 200, 300])},
|
||||
"bpm4i": {"bpm4i": DummyData(val=[5, 6, 7], timestamps=[101, 201, 301])},
|
||||
"async_device": {"async_device": DummyData(val=[1, 2, 3], timestamps=[11, 21, 31])},
|
||||
}
|
||||
dummy_scan = MagicMock()
|
||||
dummy_scan.live_data = dummy_live_data
|
||||
dummy_scan.metadata = {
|
||||
"bec": {
|
||||
"scan_id": "dummy",
|
||||
"scan_report_devices": ["samx"],
|
||||
"readout_priority": {"monitored": ["bpm4i"], "async": ["async_device"]},
|
||||
}
|
||||
}
|
||||
dummy_scan.status_message = MagicMock()
|
||||
dummy_scan.status_message.info = {
|
||||
"readout_priority": {"monitored": ["bpm4i"], "async": ["async_device"]},
|
||||
"scan_report_devices": ["samx"],
|
||||
}
|
||||
return dummy_scan
|
||||
|
||||
|
||||
def test_update_sync_curves(monkeypatch, qtbot, mocked_client):
|
||||
"""
|
||||
Test that update_sync_curves retrieves live data correctly and calls setData on sync curves.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
c = wf.plot(arg1="bpm4i")
|
||||
wf._sync_curves = [c]
|
||||
wf.x_mode = "timestamp"
|
||||
dummy_scan = create_dummy_scan_item()
|
||||
wf.scan_item = dummy_scan
|
||||
|
||||
recorded = {}
|
||||
|
||||
def fake_setData(x, y):
|
||||
recorded["x"] = x
|
||||
recorded["y"] = y
|
||||
|
||||
monkeypatch.setattr(c, "setData", fake_setData)
|
||||
|
||||
wf.update_sync_curves()
|
||||
np.testing.assert_array_equal(recorded.get("x"), [101, 201, 301])
|
||||
np.testing.assert_array_equal(recorded.get("y"), [5, 6, 7])
|
||||
|
||||
|
||||
def test_update_async_curves(monkeypatch, qtbot, mocked_client):
|
||||
"""
|
||||
Test that update_async_curves retrieves live data correctly and calls setData on async curves.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
c = wf.plot(arg1="async_device", label="async_device-async_device")
|
||||
wf._async_curves = [c]
|
||||
wf.x_mode = "timestamp"
|
||||
dummy_scan = create_dummy_scan_item()
|
||||
wf.scan_item = dummy_scan
|
||||
|
||||
recorded = {}
|
||||
|
||||
def fake_setData(x, y):
|
||||
recorded["x"] = x
|
||||
recorded["y"] = y
|
||||
|
||||
monkeypatch.setattr(c, "setData", fake_setData)
|
||||
|
||||
wf.update_async_curves()
|
||||
np.testing.assert_array_equal(recorded.get("x"), [11, 21, 31])
|
||||
np.testing.assert_array_equal(recorded.get("y"), [1, 2, 3])
|
||||
|
||||
|
||||
def test_get_x_data_custom(monkeypatch, qtbot, mocked_client):
|
||||
"""
|
||||
Test that _get_x_data returns the correct custom signal data.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
# Set x_mode to a custom mode.
|
||||
wf.x_axis_mode["name"] = "custom_signal"
|
||||
wf.x_axis_mode["entry"] = "custom_entry"
|
||||
dummy_data = DummyData(val=[50, 60, 70], timestamps=[150, 160, 170])
|
||||
dummy_live = {"custom_signal": {"custom_entry": dummy_data}}
|
||||
monkeypatch.setattr(wf, "_fetch_scan_data_and_access", lambda: (dummy_live, "val"))
|
||||
x_data = wf._get_x_data("irrelevant", "irrelevant")
|
||||
np.testing.assert_array_equal(x_data, [50, 60, 70])
|
||||
|
||||
|
||||
def test_get_x_data_timestamp(monkeypatch, qtbot, mocked_client):
|
||||
"""
|
||||
Test that _get_x_data returns the correct timestamp data.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
wf.x_axis_mode["name"] = "timestamp"
|
||||
dummy_data = DummyData(val=[50, 60, 70], timestamps=[101, 202, 303])
|
||||
dummy_live = {"deviceX": {"entryX": dummy_data}}
|
||||
monkeypatch.setattr(wf, "_fetch_scan_data_and_access", lambda: (dummy_live, "val"))
|
||||
x_data = wf._get_x_data("deviceX", "entryX")
|
||||
np.testing.assert_array_equal(x_data, [101, 202, 303])
|
||||
|
||||
|
||||
def test_categorise_device_curves(monkeypatch, qtbot, mocked_client):
|
||||
"""
|
||||
Test that _categorise_device_curves correctly categorizes curves.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
dummy_scan = create_dummy_scan_item()
|
||||
wf.scan_item = dummy_scan
|
||||
|
||||
c_sync = wf.plot(arg1="bpm4i", label="bpm4i-bpm4i")
|
||||
c_async = wf.plot(arg1="async_device", label="async_device-async_device")
|
||||
|
||||
mode = wf._categorise_device_curves()
|
||||
|
||||
assert mode == "mixed"
|
||||
assert c_sync in wf._sync_curves
|
||||
assert c_async in wf._async_curves
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
["mode", "calls"], [("sync", (1, 0)), ("async", (0, 1)), ("mixed", (1, 1))]
|
||||
)
|
||||
def test_on_scan_status(qtbot, mocked_client, monkeypatch, mode, calls):
|
||||
"""
|
||||
Test that on_scan_status sets up a new scan correctly,
|
||||
categorizes curves, and triggers sync/async updates as needed.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
# Force creation of a couple of device curves
|
||||
if mode == "sync":
|
||||
wf.plot(arg1="bpm4i")
|
||||
elif mode == "async":
|
||||
wf.plot(arg1="async_device")
|
||||
else:
|
||||
wf.plot(arg1="bpm4i")
|
||||
wf.plot(arg1="async_device")
|
||||
|
||||
# We mock out the scan_item, pretending we found a new scan.
|
||||
dummy_scan = create_dummy_scan_item()
|
||||
dummy_scan.metadata["bec"]["scan_id"] = "1234"
|
||||
monkeypatch.setattr(wf.queue.scan_storage, "find_scan_by_ID", lambda scan_id: dummy_scan)
|
||||
|
||||
# We'll track calls to sync_signal_update and async_signal_update
|
||||
sync_spy = MagicMock()
|
||||
async_spy = MagicMock()
|
||||
wf.sync_signal_update.connect(sync_spy)
|
||||
wf.async_signal_update.connect(async_spy)
|
||||
|
||||
# Prepare fake message data
|
||||
msg = {"scan_id": "1234"}
|
||||
meta = {}
|
||||
wf.on_scan_status(msg, meta)
|
||||
|
||||
assert wf.scan_id == "1234"
|
||||
assert wf.scan_item == dummy_scan
|
||||
assert wf._mode == mode
|
||||
|
||||
assert sync_spy.call_count == calls[0], "sync_signal_update should be called exactly once"
|
||||
assert async_spy.call_count == calls[1], "async_signal_update should be called exactly once"
|
||||
|
||||
|
||||
def test_add_dap_curve(qtbot, mocked_client_with_dap, monkeypatch):
|
||||
"""
|
||||
Test add_dap_curve creates a new DAP curve from an existing device curve
|
||||
and verifies that the DAP call doesn't fail due to mock-based plugin_info.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
|
||||
wf.plot(arg1="bpm4i", label="bpm4i-bpm4i")
|
||||
|
||||
dap_curve = wf.add_dap_curve(device_label="bpm4i-bpm4i", dap_name="GaussianModel")
|
||||
assert dap_curve is not None
|
||||
assert dap_curve.config.source == "dap"
|
||||
assert dap_curve.config.signal.name == "bpm4i"
|
||||
assert dap_curve.config.signal.dap == "GaussianModel"
|
||||
|
||||
|
||||
def test_fetch_scan_data_and_access(qtbot, mocked_client, monkeypatch):
|
||||
"""
|
||||
Test the _fetch_scan_data_and_access method returns live_data/val if in a live scan,
|
||||
or device dict/value if in a historical scan. Also test fallback if no scan_item.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
|
||||
wf.scan_item = None
|
||||
|
||||
hist_mock = MagicMock()
|
||||
monkeypatch.setattr(wf, "update_with_scan_history", hist_mock)
|
||||
|
||||
wf._fetch_scan_data_and_access()
|
||||
hist_mock.assert_called_once_with(-1)
|
||||
|
||||
# Ckeck live mode
|
||||
dummy_scan = create_dummy_scan_item()
|
||||
wf.scan_item = dummy_scan
|
||||
data_dict, access_key = wf._fetch_scan_data_and_access()
|
||||
assert data_dict == dummy_scan.live_data
|
||||
assert access_key == "val"
|
||||
|
||||
# Check history mode
|
||||
del dummy_scan.live_data
|
||||
dummy_scan.devices = {"some_device": {"some_entry": "some_value"}}
|
||||
data_dict, access_key = wf._fetch_scan_data_and_access()
|
||||
assert "some_device" in data_dict # from dummy_scan.devices
|
||||
assert access_key == "value"
|
||||
|
||||
|
||||
def test_setup_async_curve(qtbot, mocked_client, monkeypatch):
|
||||
"""
|
||||
Test that _setup_async_curve properly disconnects old signals
|
||||
and re-connects the async readback for a new scan ID.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
wf.old_scan_id = "111"
|
||||
wf.scan_id = "222"
|
||||
|
||||
c = wf.plot(arg1="async_device", label="async_device-async_device")
|
||||
# check that it was placed in _async_curves or so
|
||||
wf._async_curves = [c]
|
||||
|
||||
# We'll spy on connect_slot
|
||||
connect_spy = MagicMock()
|
||||
monkeypatch.setattr(wf.bec_dispatcher, "connect_slot", connect_spy)
|
||||
|
||||
wf._setup_async_curve(c)
|
||||
connect_spy.assert_called_once()
|
||||
endpoint_called = connect_spy.call_args[0][1].endpoint
|
||||
# We expect MessageEndpoints.device_async_readback('222', 'async_device')
|
||||
assert "222" in endpoint_called
|
||||
assert "async_device" in endpoint_called
|
||||
|
||||
|
||||
@pytest.mark.parametrize("x_mode", ("timestamp", "index"))
|
||||
def test_on_async_readback(qtbot, mocked_client, x_mode):
|
||||
"""
|
||||
Test that on_async_readback extends or replaces async data depending on metadata instruction.
|
||||
For 'timestamp' mode, new timestamps are appended to x_data.
|
||||
For 'index' mode, x_data simply increases by integer index.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
dummy_scan = create_dummy_scan_item()
|
||||
wf.scan_item = dummy_scan
|
||||
|
||||
c = wf.plot(arg1="async_device", label="async_device-async_device")
|
||||
wf._async_curves = [c]
|
||||
# Suppose existing data
|
||||
c.setData([0, 1, 2], [10, 11, 12])
|
||||
|
||||
# Set the x_axis_mode
|
||||
wf.x_axis_mode["name"] = x_mode
|
||||
|
||||
# Extend readback
|
||||
msg = {"signals": {"async_device": {"value": [100, 200], "timestamp": [1001, 1002]}}}
|
||||
metadata = {"async_update": {"max_shape": [None], "type": "add"}}
|
||||
wf.on_async_readback(msg, metadata)
|
||||
|
||||
x_data, y_data = c.get_data()
|
||||
assert len(x_data) == 5
|
||||
# Check x_data based on x_mode
|
||||
if x_mode == "timestamp":
|
||||
np.testing.assert_array_equal(x_data, [0, 1, 2, 1001, 1002])
|
||||
else: # x_mode == "index"
|
||||
np.testing.assert_array_equal(x_data, [0, 1, 2, 3, 4])
|
||||
|
||||
np.testing.assert_array_equal(y_data, [10, 11, 12, 100, 200])
|
||||
|
||||
# instruction='replace'
|
||||
msg2 = {"signals": {"async_device": {"value": [999], "timestamp": [555]}}}
|
||||
metadata2 = {"async_update": {"max_shape": [None], "type": "replace"}}
|
||||
wf.on_async_readback(msg2, metadata2)
|
||||
x_data2, y_data2 = c.get_data()
|
||||
if x_mode == "timestamp":
|
||||
np.testing.assert_array_equal(x_data2, [555])
|
||||
else:
|
||||
|
||||
np.testing.assert_array_equal(x_data2, [0])
|
||||
|
||||
np.testing.assert_array_equal(y_data2, [999])
|
||||
|
||||
|
||||
def test_get_x_data(qtbot, mocked_client, monkeypatch):
|
||||
"""
|
||||
Test _get_x_data logic for multiple modes: 'timestamp', 'index', 'custom', 'auto'.
|
||||
Use a dummy scan_item that returns specific data for the requested signal.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
dummy_scan = create_dummy_scan_item()
|
||||
wf.scan_item = dummy_scan
|
||||
|
||||
# 1) x_mode == 'timestamp'
|
||||
wf.x_axis_mode["name"] = "timestamp"
|
||||
x_data = wf._get_x_data("bpm4i", "bpm4i")
|
||||
np.testing.assert_array_equal(x_data, [101, 201, 301])
|
||||
|
||||
# 2) x_mode == 'index' => returns None => means use Y data indexing
|
||||
wf.x_axis_mode["name"] = "index"
|
||||
x_data2 = wf._get_x_data("bpm4i", "bpm4i")
|
||||
assert x_data2 is None
|
||||
|
||||
# 3) custom x => e.g. "samx"
|
||||
wf.x_axis_mode["name"] = "samx"
|
||||
x_custom = wf._get_x_data("bpm4i", "bpm4i")
|
||||
# because dummy_scan.live_data["samx"]["samx"].val => [10,20,30]
|
||||
np.testing.assert_array_equal(x_custom, [10, 20, 30])
|
||||
|
||||
# 4) auto
|
||||
wf._async_curves.clear()
|
||||
wf._sync_curves = [MagicMock()] # pretend we have a sync device
|
||||
wf.x_axis_mode["name"] = "auto"
|
||||
x_auto = wf._get_x_data("bpm4i", "bpm4i")
|
||||
# By default it tries the "scan_report_devices" => "samx" => same as custom above
|
||||
np.testing.assert_array_equal(x_auto, [10, 20, 30])
|
||||
|
||||
|
||||
##################################################
|
||||
# The following tests are for the Curve class
|
||||
##################################################
|
||||
|
||||
|
||||
def test_curve_set_appearance_methods(qtbot, mocked_client):
|
||||
"""
|
||||
Test that the Curve appearance setter methods update the configuration properly.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
c = wf.plot(x=[1, 2, 3], y=[4, 5, 6], label="appearance_curve")
|
||||
c.set_color("#0000ff")
|
||||
c.set_symbol("x")
|
||||
c.set_symbol_color("#ff0000")
|
||||
c.set_symbol_size(10)
|
||||
c.set_pen_width(3)
|
||||
c.set_pen_style("dashdot")
|
||||
assert c.config.color == "#0000ff"
|
||||
assert c.config.symbol == "x"
|
||||
assert c.config.symbol_color == "#ff0000"
|
||||
assert c.config.symbol_size == 10
|
||||
assert c.config.pen_width == 3
|
||||
assert c.config.pen_style == "dashdot"
|
||||
|
||||
|
||||
def test_curve_set_custom_data(qtbot, mocked_client):
|
||||
"""
|
||||
Test that custom curves allow setting new data via set_data.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
c = wf.plot(x=[1, 2, 3], y=[4, 5, 6], label="custom_data_curve")
|
||||
# Change data
|
||||
c.set_data([7, 8, 9], [10, 11, 12])
|
||||
x_data, y_data = c.get_data()
|
||||
np.testing.assert_array_equal(x_data, np.array([7, 8, 9]))
|
||||
np.testing.assert_array_equal(y_data, np.array([10, 11, 12]))
|
||||
|
||||
|
||||
def test_curve_set_data_error_non_custom(qtbot, mocked_client):
|
||||
"""
|
||||
Test that calling set_data on a non-custom (device) curve raises a ValueError.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
# Create a device curve by providing y_name (which makes source 'device')
|
||||
# Assume that entry_validator returns a valid entry.
|
||||
c = wf.plot(arg1="bpm4i", label="device_curve")
|
||||
with pytest.raises(ValueError):
|
||||
c.set_data([1, 2, 3], [4, 5, 6])
|
||||
|
||||
|
||||
def test_curve_remove(qtbot, mocked_client):
|
||||
"""
|
||||
Test that calling remove() on a Curve calls its parent's remove_curve method.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
c1 = wf.plot(x=[1, 2, 3], y=[4, 5, 6], label="curve_1")
|
||||
c2 = wf.plot(x=[1, 2, 3], y=[4, 5, 6], label="curve_2")
|
||||
|
||||
assert len(wf.plot_item.curves) == 2
|
||||
c1.remove()
|
||||
assert len(wf.plot_item.curves) == 1
|
||||
assert c1 not in wf.plot_item.curves
|
||||
assert c2 in wf.plot_item.curves
|
||||
|
||||
|
||||
def test_curve_dap_params_and_summary(qtbot, mocked_client):
|
||||
"""
|
||||
Test that dap_params and dap_summary properties work as expected.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
c = wf.plot(x=[1, 2, 3], y=[4, 5, 6], label="dap_curve")
|
||||
c.dap_params = {"param": 1}
|
||||
c.dap_summary = {"summary": "test"}
|
||||
assert c.dap_params == {"param": 1}
|
||||
assert c.dap_summary == {"summary": "test"}
|
||||
|
||||
|
||||
def test_curve_set_method(qtbot, mocked_client):
|
||||
"""
|
||||
Test the convenience set(...) method of the Curve for updating appearance properties.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
c = wf.plot(x=[1, 2, 3], y=[4, 5, 6], label="set_method_curve")
|
||||
c.set(
|
||||
color="#123456",
|
||||
symbol="d",
|
||||
symbol_color="#654321",
|
||||
symbol_size=12,
|
||||
pen_width=5,
|
||||
pen_style="dot",
|
||||
)
|
||||
assert c.config.color == "#123456"
|
||||
assert c.config.symbol == "d"
|
||||
assert c.config.symbol_color == "#654321"
|
||||
assert c.config.symbol_size == 12
|
||||
assert c.config.pen_width == 5
|
||||
assert c.config.pen_style == "dot"
|
||||
|
||||
|
||||
##################################################
|
||||
# Settings and popups
|
||||
##################################################
|
||||
|
||||
|
||||
def test_show_curve_settings_popup(qtbot, mocked_client):
|
||||
"""
|
||||
Test that show_curve_settings_popup displays the settings dialog and toggles the toolbar icon.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
|
||||
curve_action = wf.toolbar.widgets["curve"].action
|
||||
assert not curve_action.isChecked(), "Should start unchecked"
|
||||
|
||||
wf.show_curve_settings_popup()
|
||||
|
||||
assert wf.curve_settings_dialog is not None
|
||||
assert wf.curve_settings_dialog.isVisible()
|
||||
assert curve_action.isChecked()
|
||||
|
||||
wf.curve_settings_dialog.close()
|
||||
assert wf.curve_settings_dialog is None
|
||||
assert not curve_action.isChecked(), "Should be unchecked after closing dialog"
|
||||
|
||||
|
||||
def test_show_dap_summary_popup(qtbot, mocked_client):
|
||||
"""
|
||||
Test that show_dap_summary_popup displays the DAP summary dialog and toggles the 'fit_params' toolbar icon.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client, popups=True)
|
||||
|
||||
assert "fit_params" in wf.toolbar.widgets
|
||||
|
||||
fit_action = wf.toolbar.widgets["fit_params"].action
|
||||
assert fit_action.isChecked() is False
|
||||
|
||||
wf.show_dap_summary_popup()
|
||||
|
||||
assert wf.dap_summary_dialog is not None
|
||||
assert wf.dap_summary_dialog.isVisible()
|
||||
assert fit_action.isChecked() is True
|
||||
|
||||
wf.dap_summary_dialog.close()
|
||||
assert wf.dap_summary_dialog is None
|
||||
assert fit_action.isChecked() is False
|
Reference in New Issue
Block a user