From 5a5d32312b08e1edeb69243daddfaaa9bac22273 Mon Sep 17 00:00:00 2001 From: wyzula-jan Date: Thu, 17 Jul 2025 17:06:50 +0200 Subject: [PATCH] test(waveform,curve_tree): test extended to cover history curve behaviour --- tests/unit_tests/client_mocks.py | 16 ++ tests/unit_tests/conftest.py | 124 +++++++++++- tests/unit_tests/test_curve_settings.py | 57 +++++- tests/unit_tests/test_waveform_next_gen.py | 224 ++++++++++++++++++++- 4 files changed, 409 insertions(+), 12 deletions(-) diff --git a/tests/unit_tests/client_mocks.py b/tests/unit_tests/client_mocks.py index f8d9ca81..b64a65ee 100644 --- a/tests/unit_tests/client_mocks.py +++ b/tests/unit_tests/client_mocks.py @@ -7,6 +7,7 @@ import pytest from bec_lib.bec_service import messages from bec_lib.endpoints import MessageEndpoints from bec_lib.redis_connector import RedisConnector +from bec_lib.scan_history import ScanHistory from bec_widgets.tests.utils import DEVICES, DMMock, FakePositioner, Positioner @@ -238,3 +239,18 @@ def create_dummy_scan_item(): "scan_report_devices": ["samx"], } return dummy_scan + + +def inject_scan_history(widget, scan_history_factory, *history_args): + """ + Helper to inject scan history messages into client history. + """ + history_msgs = [] + for scan_id, scan_number in history_args: + history_msgs.append(scan_history_factory(scan_id=scan_id, scan_number=scan_number)) + widget.client.history = ScanHistory(widget.client, False) + for msg in history_msgs: + widget.client.history._scan_data[msg.scan_id] = msg + widget.client.history._scan_ids.append(msg.scan_id) + widget.client.queue.scan_storage.current_scan = None + return history_msgs diff --git a/tests/unit_tests/conftest.py b/tests/unit_tests/conftest.py index f0160a92..db5427dc 100644 --- a/tests/unit_tests/conftest.py +++ b/tests/unit_tests/conftest.py @@ -5,8 +5,9 @@ import h5py import numpy as np import pytest from bec_lib import messages +from bec_lib.messages import _StoredDataInfo from pytestqt.exceptions import TimeoutError as QtBotTimeoutError -from qtpy.QtWidgets import QApplication +from qtpy.QtWidgets import QApplication, QMessageBox from bec_widgets.cli.rpc.rpc_register import RPCRegister from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module @@ -123,9 +124,25 @@ def create_history_file(file_path, data: dict, metadata: dict) -> messages.ScanH elif isinstance(sub_value, dict): for sub_sub_key, sub_sub_value in sub_value.items(): sub_sub_group = metadata_bec[key].create_group(sub_key) + # Handle _StoredDataInfo objects + if isinstance(sub_sub_value, _StoredDataInfo): + # Store the numeric shape + sub_sub_group.create_dataset("shape", data=sub_sub_value.shape) + # Store the dtype as a UTF-8 string + dt = sub_sub_value.dtype or "" + sub_sub_group.create_dataset( + "dtype", data=dt, dtype=h5py.string_dtype(encoding="utf-8") + ) + continue if isinstance(sub_sub_value, list): - sub_sub_value = json.dumps(sub_sub_value) - sub_sub_group.create_dataset(sub_sub_key, data=sub_sub_value) + json_val = json.dumps(sub_sub_value) + sub_sub_group.create_dataset(sub_sub_key, data=json_val) + elif isinstance(sub_sub_value, dict): + for k2, v2 in sub_sub_value.items(): + val = json.dumps(v2) if isinstance(v2, list) else v2 + sub_sub_group.create_dataset(k2, data=val) + else: + sub_sub_group.create_dataset(sub_sub_key, data=sub_sub_value) else: metadata_bec[key].create_dataset(sub_key, data=sub_value) else: @@ -152,6 +169,8 @@ def create_history_file(file_path, data: dict, metadata: dict) -> messages.ScanH end_time=time.time(), num_points=metadata["num_points"], request_inputs=metadata["request_inputs"], + stored_data_info=metadata.get("stored_data_info"), + metadata={"scan_report_devices": metadata.get("scan_report_devices")}, ) return msg @@ -202,3 +221,102 @@ def grid_scan_history_msg(tmpdir): file_path = str(tmpdir.join("scan_1.h5")) return create_history_file(file_path, data, metadata) + + +@pytest.fixture +def scan_history_factory(tmpdir): + """ + Factory to create scan history messages with custom parameters. + Usage: + msg1 = scan_history_factory(scan_id="id1", scan_number=1, num_points=10) + msg2 = scan_history_factory(scan_id="id2", scan_number=2, scan_name="grid_scan", num_points=16) + """ + + def _factory( + scan_id: str = "test_scan", + scan_number: int = 1, + dataset_number: int = 1, + scan_name: str = "line_scan", + scan_type: str = "step", + num_points: int = 10, + x_range: tuple = (-5, 5), + y_range: tuple = (-5, 5), + ): + # Generate positions based on scan type + if scan_name == "grid_scan": + grid_size = int(np.sqrt(num_points)) + x_grid, y_grid = np.meshgrid( + np.linspace(x_range[0], x_range[1], grid_size), + np.linspace(y_range[0], y_range[1], grid_size), + ) + x_flat = x_grid.T.ravel() + y_flat = y_grid.T.ravel() + else: + x_flat = np.linspace(x_range[0], x_range[1], num_points) + y_flat = np.linspace(y_range[0], y_range[1], num_points) + positions = np.vstack((x_flat, y_flat)).T + num_pts = len(positions) + # Create dummy data + data = { + "baseline": {"bpm1a": {"bpm1a": {"value": [1], "timestamp": [100]}}}, + "monitored": { + "bpm4i": { + "bpm4i": { + "value": np.random.rand(num_points), + "timestamp": np.random.rand(num_points), + } + }, + "bpm3a": { + "bpm3a": { + "value": np.random.rand(num_points), + "timestamp": np.random.rand(num_points), + } + }, + "samx": {"samx": {"value": x_flat, "timestamp": np.arange(num_pts)}}, + "samy": {"samy": {"value": y_flat, "timestamp": np.arange(num_pts)}}, + }, + "async": { + "async_device": { + "async_device": { + "value": np.random.rand(num_pts * 10), + "timestamp": np.random.rand(num_pts * 10), + } + } + }, + } + metadata = { + "scan_id": scan_id, + "scan_name": scan_name, + "scan_type": scan_type, + "exit_status": "closed", + "scan_number": scan_number, + "dataset_number": dataset_number, + "request_inputs": { + "arg_bundle": [ + "samx", + x_range[0], + x_range[1], + num_pts, + "samy", + y_range[0], + y_range[1], + num_pts, + ], + "kwargs": {"relative": True}, + }, + "positions": positions.tolist(), + "num_points": num_pts, + "stored_data_info": { + "samx": {"samx": _StoredDataInfo(shape=(num_points,), dtype="float64")}, + "samy": {"samy": _StoredDataInfo(shape=(num_points,), dtype="float64")}, + "bpm4i": {"bpm4i": _StoredDataInfo(shape=(10,), dtype="float64")}, + "async_device": { + "async_device": _StoredDataInfo(shape=(num_points * 10,), dtype="float64") + }, + }, + "scan_report_devices": [b"samx"], + } + file_path = str(tmpdir.join(f"{scan_id}.h5")) + return create_history_file(file_path, data, metadata) + + return _factory diff --git a/tests/unit_tests/test_curve_settings.py b/tests/unit_tests/test_curve_settings.py index 585958f4..d7597d5f 100644 --- a/tests/unit_tests/test_curve_settings.py +++ b/tests/unit_tests/test_curve_settings.py @@ -2,10 +2,15 @@ import json from unittest.mock import MagicMock, patch import pytest +from bec_lib.scan_history import ScanHistory +from qtpy.QtGui import QValidator from qtpy.QtWidgets import QComboBox, QVBoxLayout from bec_widgets.widgets.plots.waveform.settings.curve_settings.curve_setting import CurveSetting -from bec_widgets.widgets.plots.waveform.settings.curve_settings.curve_tree import CurveTree +from bec_widgets.widgets.plots.waveform.settings.curve_settings.curve_tree import ( + CurveTree, + ScanIndexValidator, +) from bec_widgets.widgets.plots.waveform.waveform import Waveform from tests.unit_tests.client_mocks import dap_plugin_message, mocked_client, mocked_client_with_dap from tests.unit_tests.conftest import create_widget @@ -374,3 +379,53 @@ def test_export_data_dap(curve_tree_fixture): assert exported["signal"]["entry"] == "bpm4i" assert exported["signal"]["dap"] == "GaussianModel" assert exported["label"] == "bpm4i-bpm4i-GaussianModel" + + +def test_scan_index_validator_behavior(): + """ + Test ScanIndexValidator allows empty, 'live', partial 'live', valid scan numbers, + and rejects out-of-range or invalid inputs. + """ + validator = ScanIndexValidator(max_scan=3) + + def state(txt): + s, _, _ = validator.validate(txt, 0) + return s + + assert state("") == QValidator.Acceptable + assert state("live") == QValidator.Acceptable + assert state("l") == QValidator.Intermediate + assert state("liv") == QValidator.Intermediate + assert state("1") == QValidator.Acceptable + assert state("3") == QValidator.Acceptable + assert state("4") == QValidator.Invalid + assert state("0") == QValidator.Invalid + assert state("abc") == QValidator.Invalid + + +def test_export_data_history_curve(curve_tree_fixture, scan_history_factory): + """ + Test that export_data for a history curve row correctly serializes scan_number + and resets scan_id when a numeric scan is selected. + """ + curve_tree, wf = curve_tree_fixture + # Inject two history scans into the waveform client + msgs = [ + scan_history_factory(scan_id="hid1", scan_number=1), + scan_history_factory(scan_id="hid2", scan_number=2), + ] + wf.client.history = ScanHistory(wf.client, False) + for m in msgs: + wf.client.history._scan_data[m.scan_id] = m + wf.client.history._scan_ids.append(m.scan_id) + wf.client.queue.scan_storage.current_scan = None + + # Create a device row and select scan index "2" + device_row = curve_tree.add_new_curve(name="bpm4i", entry="bpm4i") + device_row.scan_index_combo.setCurrentText("2") + + exported = device_row.export_data() + assert exported["source"] == "history" + assert exported["scan_number"] == 2 + assert exported["scan_id"] is None + assert exported["label"] == "bpm4i-bpm4i-scan-2" diff --git a/tests/unit_tests/test_waveform_next_gen.py b/tests/unit_tests/test_waveform_next_gen.py index 4a1da243..01c48f6d 100644 --- a/tests/unit_tests/test_waveform_next_gen.py +++ b/tests/unit_tests/test_waveform_next_gen.py @@ -10,22 +10,19 @@ import pyqtgraph as pg import pytest from pyqtgraph.graphicsItems.DateAxisItem import DateAxisItem from qtpy.QtCore import QTimer -from qtpy.QtWidgets import ( - QApplication, - QCheckBox, - QDialog, - QDialogButtonBox, - QDoubleSpinBox, - QSpinBox, -) +from qtpy.QtWidgets import QApplication, QCheckBox, QDialog, QDialogButtonBox, QDoubleSpinBox from bec_widgets.widgets.plots.plot_base import UIMode from bec_widgets.widgets.plots.waveform.curve import DeviceSignal from bec_widgets.widgets.plots.waveform.waveform import Waveform +from bec_widgets.widgets.services.scan_history_browser.scan_history_browser import ( + ScanHistoryBrowser, +) from tests.unit_tests.client_mocks import ( DummyData, create_dummy_scan_item, dap_plugin_message, + inject_scan_history, mocked_client, mocked_client_with_dap, ) @@ -841,6 +838,33 @@ def test_show_dap_summary_popup(qtbot, mocked_client): assert fit_action.isChecked() is False +def test_show_scan_history_popup(qtbot, mocked_client): + """ + Test that show_scan_history_popup displays the scan history browser dialog + and toggles the toolbar action correctly. + """ + wf = create_widget(qtbot, Waveform, client=mocked_client) + scan_action = wf.toolbar.components.get_action("scan_history").action + # Initially unchecked and no dialog + assert not scan_action.isChecked() + assert wf.scan_history_dialog is None + + # Show the popup + wf.show_scan_history_popup() + # Dialog should exist and be visible, action checked + assert wf.scan_history_dialog is not None + assert wf.scan_history_dialog.isVisible() + assert scan_action.isChecked() + # The embedded widget should be the correct type + assert isinstance(wf.scan_history_widget, ScanHistoryBrowser) + + # Close the dialog (triggers _scan_history_closed) + wf.scan_history_dialog.close() + # Dialog reference should be cleared and action unchecked + assert wf.scan_history_dialog is None + assert not scan_action.isChecked() + + ##################################################### # The following tests are for the async dataset guard ##################################################### @@ -1063,3 +1087,187 @@ def test_dialog_reject_real_interaction(qtbot, mocked_client): assert wf.skip_large_dataset_warning is True # Limit remains unchanged assert wf.max_dataset_size_mb == 1 + + +def test_update_with_scan_history_by_index(qtbot, mocked_client, scan_history_factory): + """ + Test that update_with_scan_history by index loads the correct historical scan. + """ + wf = create_widget(qtbot, Waveform, client=mocked_client) + hist1, hist2 = inject_scan_history(wf, scan_history_factory, ("hist1", 1), ("hist2", 2)) + + assert len(wf.client.history._scan_ids) == 2, "Expected two history scans" + + # Do history curve plotting + wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id="hist1") + wf.plot(y_name="bpm4i", scan_number=2) + + assert len(wf.plot_item.curves) == 2, "Expected two curves for history scans" + c1, c2 = wf.plot_item.curves + # First curve should be for hist1, second for hist2 + assert c1.config.signal.name == "bpm4i" + assert c1.config.signal.entry == "bpm4i" + assert c1.config.scan_id == "hist1" + assert c1.config.scan_number == 1 + assert c1.name() == "bpm4i-bpm4i-scan-1" + + assert c2.config.signal.name == "bpm4i" + assert c2.config.signal.entry == "bpm4i" + assert c2.config.scan_id == "hist2" + assert c2.config.scan_number == 2 + assert c2.name() == "bpm4i-bpm4i-scan-2" + + +@pytest.mark.parametrize("mode", ["auto", "timestamp", "index", "samx"]) +def test_history_curve_x_modes_pre_plot(qtbot, mocked_client, scan_history_factory, mode): + """ + Test that history curves respect x_mode when set before plotting. + """ + wf = create_widget(qtbot, Waveform, client=mocked_client) + hist1, hist2 = inject_scan_history(wf, scan_history_factory, ("hist1", 1), ("hist2", 2)) + wf.x_mode = mode + c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id="hist1") + assert c.config.current_x_mode == mode + + +@pytest.mark.parametrize("mode", ["auto", "timestamp", "index", "samx"]) +def test_history_curve_x_modes_post_plot(qtbot, mocked_client, scan_history_factory, mode): + """ + Test that changing x_mode after plotting history curves updates the curve on refresh. + """ + wf = create_widget(qtbot, Waveform, client=mocked_client) + hist1, hist2 = inject_scan_history(wf, scan_history_factory, ("hist1", 1), ("hist2", 2)) + c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id="hist1") + # Change x_mode after plotting + wf.x_mode = mode + # Refresh history curves + wf._refresh_history_curves() + assert c.config.current_x_mode == mode + + +def test_history_curve_incompatible_x_mode_hides_curve(qtbot, mocked_client, scan_history_factory): + """ + Test that setting an x_mode not present in stored data hides the history curve. + """ + wf = create_widget(qtbot, Waveform, client=mocked_client) + wf.x_mode = "nonexistent_device" + # Inject history scan for this test + [history_msg] = inject_scan_history(wf, scan_history_factory, ("hist_bad", 1)) + # Plot history curve + c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id=history_msg.scan_id) + # Curve should be hidden due to incompatible x_mode + assert not c.isVisible() + + +def test_fetch_history_data_no_stored_data_raises( + qtbot, mocked_client, monkeypatch, suppress_message_box +): + """ + Test that fetching history data when stored_data_info is missing raises ValueError. + """ + wf = create_widget(qtbot, Waveform, client=mocked_client) + # Create a dummy scan_item lacking stored_data_info + dummy_scan = SimpleNamespace( + _msg=SimpleNamespace(stored_data_info=None), + devices={}, + metadata={"bec": {"scan_id": "dummy", "scan_number": 1, "scan_report_devices": []}}, + ) + # Force get_history_scan_item to return our dummy + monkeypatch.setattr(wf, "get_history_scan_item", lambda scan_id, scan_index: dummy_scan) + # Attempt to plot history curve should be suppressed by SafeSlot and return None + c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id="dummy", scan_number=1) + assert c is None + assert len(wf.curves) == 0 + + +def test_history_curve_device_missing_returns_none(qtbot, mocked_client, scan_history_factory): + """ + If the y-device is not in stored_data_info, plot should return None. + """ + wf = create_widget(qtbot, Waveform, client=mocked_client) + wf.x_mode = "index" + [history_msg] = inject_scan_history(wf, scan_history_factory, ("hist_dev_missing", 1)) + c = wf.plot(y_name="non-existing", y_entry="non-existing", scan_id=history_msg.scan_id) + assert c is None + + +def test_history_curve_custom_shape_mismatch_hides_curve( + qtbot, mocked_client, scan_history_factory +): + """ + For custom x-mode, if x and y shapes mismatch, curve should be hidden. + """ + wf = create_widget(qtbot, Waveform, client=mocked_client) + wf.x_mode = "async_device" + [history_msg] = inject_scan_history(wf, scan_history_factory, ("hist_custom_shape", 1)) + # Force shape mismatch for x-data + c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id=history_msg.scan_id) + assert c is not None + assert not c.isVisible() + + +def test_history_curve_index_mode_plots_curve(qtbot, mocked_client, scan_history_factory): + """ + Test that setting x_mode to 'index' plots and shows the history curve correctly. + """ + wf = create_widget(qtbot, Waveform, client=mocked_client) + wf.x_mode = "index" + [history_msg] = inject_scan_history(wf, scan_history_factory, ("hist_index", 1)) + c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id=history_msg.scan_id) + assert c is not None + assert c.isVisible() + assert c.config.current_x_mode == "index" + + +def test_history_curve_timestamp_mode_plots_curve(qtbot, mocked_client, scan_history_factory): + """ + Test that setting x_mode to 'timestamp' plots and shows the history curve correctly. + """ + wf = create_widget(qtbot, Waveform, client=mocked_client) + wf.x_mode = "timestamp" + [history_msg] = inject_scan_history(wf, scan_history_factory, ("hist_time", 1)) + c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id=history_msg.scan_id) + assert c is not None + assert c.isVisible() + assert c.config.current_x_mode == "timestamp" + + +def test_history_curve_auto_valid_uses_first_report_device( + qtbot, mocked_client, scan_history_factory +): + """ + Test that 'auto' x_mode uses the first available report device and shows the curve. + """ + wf = create_widget(qtbot, Waveform, client=mocked_client) + wf.x_mode = "auto" + [history_msg] = inject_scan_history(wf, scan_history_factory, ("hist_auto_valid", 1)) + # Plot history curve + c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id=history_msg.scan_id) + assert c is not None + assert c.isVisible() + # Should have fallen back to the first scan_report_device + assert c.config.current_x_mode == "auto" + + +def test_history_curve_file_not_found_returns_none(qtbot, mocked_client, scan_history_factory): + """ + If the history file path does not exist, plot should return None. + """ + wf = create_widget(qtbot, Waveform, client=mocked_client) + wf.x_mode = "index" + # Inject a valid history message then corrupt its file_path + [history_msg] = inject_scan_history(wf, scan_history_factory, ("bad_file", 1)) + history_msg.file_path = "/nonexistent/path.h5" + c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id=history_msg.scan_id) + assert c is None + + +def test_history_curve_scan_not_found_returns_none(qtbot, mocked_client): + """ + If the requested scan_id is not in history, plot should return None. + """ + wf = create_widget(qtbot, Waveform, client=mocked_client) + wf.x_mode = "index" + # No history scans injected for this widget + c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id="unknown_scan") + assert c is None