1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-10 02:30:54 +02:00

Compare commits

...

19 Commits

Author SHA1 Message Date
40d1fcb621 remove from_start option for register 2025-04-16 09:34:58 +02:00
7f549f02ce wip add wait for test with waveform in test_widgets_e2e 2025-04-15 21:54:31 +02:00
c6db680a9c fix add logging on sender 2025-04-15 21:28:19 +02:00
d1d9991aff wip: add log print in setup async curves 2025-04-15 21:16:25 +02:00
9d04b6e9d9 wip: test avoid registering 2 callbacks 2025-04-15 20:59:58 +02:00
0e27bf50c2 wip(conftest): scope function for gui client object. 2025-04-15 20:47:19 +02:00
6f1a472128 fix(on_async_readback): suppress history updates from old scan_id 2025-04-15 20:17:49 +02:00
120303b8db wip debug dispatcher 2025-04-15 14:02:31 +02:00
ffc3aebffd wip move processEvents before curves are resettet 2025-04-15 14:02:31 +02:00
f7538c4e60 fix(waveform): bug in setup async curves for plotting data from wrong scan_id 2025-04-15 14:02:31 +02:00
eb0b79a303 fix(test): update last scan data retrieval in async plotting test 2025-04-15 14:02:31 +02:00
6f13a4a11e fix: Skip VSCodeEditor in CI 2025-04-15 14:02:31 +02:00
2117b13840 refactor: add pragma no cover to various TYPE_CHECKING 2025-04-15 14:02:31 +02:00
f782994083 fix(formatter): fix formatting 2025-04-15 14:02:31 +02:00
de76087c7e fix(moduar-toolbar): fix cleanup of modular toolbar and dock_area 2025-04-15 14:02:31 +02:00
ca45268430 fix(website-widget): add super().cleanup() in website widget 2025-04-15 14:02:31 +02:00
2453734578 ci: run e2e tests without --flush-redis 2025-04-15 14:02:31 +02:00
0b8025fea9 feat: add rpc_widget e2e for all widgets 2025-04-15 14:02:31 +02:00
2f06059dab fix: RPC access enabled for certain widgets. 2025-04-15 14:02:31 +02:00
33 changed files with 956 additions and 69 deletions

View File

@@ -216,7 +216,7 @@ end-2-end-conda:
- pip install -e ./ophyd_devices
- pip install -e .[dev,pyside6]
- pytest -v --files-path ./ --start-servers --flush-redis --random-order ./tests/end-2-end
- pytest -v --files-path ./ --start-servers --random-order ./tests/end-2-end
artifacts:
when: on_failure

View File

@@ -24,22 +24,31 @@ class _WidgetsEnumType(str, enum.Enum):
_Widgets = {
"AbortButton": "AbortButton",
"BECDockArea": "BECDockArea",
"BECProgressBar": "BECProgressBar",
"BECQueue": "BECQueue",
"BECStatusBox": "BECStatusBox",
"DapComboBox": "DapComboBox",
"DarkModeButton": "DarkModeButton",
"DeviceBrowser": "DeviceBrowser",
"DeviceComboBox": "DeviceComboBox",
"DeviceLineEdit": "DeviceLineEdit",
"Image": "Image",
"LogPanel": "LogPanel",
"Minesweeper": "Minesweeper",
"MotorMap": "MotorMap",
"MultiWaveform": "MultiWaveform",
"PositionIndicator": "PositionIndicator",
"PositionerBox": "PositionerBox",
"PositionerBox2D": "PositionerBox2D",
"PositionerControlLine": "PositionerControlLine",
"ResetButton": "ResetButton",
"ResumeButton": "ResumeButton",
"RingProgressBar": "RingProgressBar",
"ScanControl": "ScanControl",
"ScatterWaveform": "ScatterWaveform",
"StopButton": "StopButton",
"TextBox": "TextBox",
"VSCodeEditor": "VSCodeEditor",
"Waveform": "Waveform",
@@ -72,6 +81,16 @@ for plugin_name, plugin_class in inspect.getmembers(plugin_client, inspect.iscla
globals()[plugin_name] = plugin_class
class AbortButton(RPCBase):
"""A button that abort the scan."""
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
"""
class AutoUpdates(RPCBase):
@property
@rpc_call
@@ -445,6 +464,12 @@ class BECProgressBar(RPCBase):
>>> progressbar.label_template = "$value / $percentage %"
"""
@rpc_call
def _get_label(self) -> str:
"""
Return the label text. mostly used for testing rpc.
"""
class BECQueue(RPCBase):
"""Widget to display the BEC queue."""
@@ -460,9 +485,9 @@ class BECStatusBox(RPCBase):
"""An autonomous widget to display the status of BEC services."""
@rpc_call
def remove(self):
def get_server_state(self) -> "str":
"""
Cleanup the BECConnector
Get the state ("RUNNING", "BUSY", "IDLE", "ERROR") of the BEC server
"""
@@ -661,6 +686,14 @@ class DapComboBox(RPCBase):
"""
class DarkModeButton(RPCBase):
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
"""
class DeviceBrowser(RPCBase):
"""DeviceBrowser is a widget that displays all available devices in the current BEC session."""
@@ -1365,23 +1398,10 @@ class ImageItem(RPCBase):
class LogPanel(RPCBase):
"""Displays a log panel"""
@rpc_call
def set_plain_text(self, text: str) -> None:
"""
Set the plain text of the widget.
...
Args:
text (str): The text to set.
"""
@rpc_call
def set_html_text(self, text: str) -> None:
"""
Set the HTML text of the widget.
Args:
text (str): The text to set.
"""
class Minesweeper(RPCBase): ...
class MotorMap(RPCBase):
@@ -2227,6 +2247,54 @@ class PositionIndicator(RPCBase):
"""
class PositionerBox(RPCBase):
"""Simple Widget to control a positioner in box form"""
@rpc_call
def set_positioner(self, positioner: "str | Positioner"):
"""
Set the device
Args:
positioner (Positioner | str) : Positioner to set, accepts str or the device
"""
class PositionerBox2D(RPCBase):
"""Simple Widget to control two positioners in box form"""
@rpc_call
def set_positioner_hor(self, positioner: "str | Positioner"):
"""
Set the device
Args:
positioner (Positioner | str) : Positioner to set, accepts str or the device
"""
@rpc_call
def set_positioner_ver(self, positioner: "str | Positioner"):
"""
Set the device
Args:
positioner (Positioner | str) : Positioner to set, accepts str or the device
"""
class PositionerControlLine(RPCBase):
"""A widget that controls a single device."""
@rpc_call
def set_positioner(self, positioner: "str | Positioner"):
"""
Set the device
Args:
positioner (Positioner | str) : Positioner to set, accepts str or the device
"""
class PositionerGroup(RPCBase):
"""Simple Widget to control a positioner in box form"""
@@ -2239,6 +2307,26 @@ class PositionerGroup(RPCBase):
"""
class ResetButton(RPCBase):
"""A button that resets the scan queue."""
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
"""
class ResumeButton(RPCBase):
"""A button that continue scan queue."""
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
"""
class Ring(RPCBase):
@rpc_call
def _get_all_rpc(self) -> "dict":
@@ -2883,6 +2971,16 @@ class ScatterWaveform(RPCBase):
"""
class StopButton(RPCBase):
"""A button that stops the current scan."""
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
"""
class TextBox(RPCBase):
"""A widget that displays text in plain and HTML format"""

View File

@@ -16,7 +16,7 @@ from qtpy.QtCore import Signal as pyqtSignal
logger = bec_logger.logger
if TYPE_CHECKING:
if TYPE_CHECKING: # pragma: no cover
from bec_lib.endpoints import EndpointInfo
from bec_widgets.utils.rpc_server import RPCServer
@@ -25,9 +25,9 @@ if TYPE_CHECKING:
class QtThreadSafeCallback(QObject):
cb_signal = pyqtSignal(dict, dict)
def __init__(self, cb):
def __init__(self, cb, cb_info: dict | None = None):
super().__init__()
self.cb_info = cb_info
self.cb = cb
self.cb_signal.connect(self.cb)
@@ -137,6 +137,7 @@ class BECDispatcher:
self,
slot: Callable,
topics: Union[EndpointInfo, str, list[Union[EndpointInfo, str]]],
cb_info: dict | None = None,
**kwargs,
) -> None:
"""Connect widget's qt slot, so that it is called on new pub/sub topic message.
@@ -146,7 +147,7 @@ class BECDispatcher:
the corresponding pub/sub message
topics (EndpointInfo | str | list): A topic or list of topics that can typically be acquired via bec_lib.MessageEndpoints
"""
slot = QtThreadSafeCallback(slot)
slot = QtThreadSafeCallback(slot, cb_info=cb_info)
self.client.connector.register(topics, cb=slot, **kwargs)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
self._slots[slot].update(set(topics_str))

View File

@@ -11,7 +11,7 @@ from pydantic_core import PydanticCustomError
from qtpy.QtGui import QColor
from qtpy.QtWidgets import QApplication
if TYPE_CHECKING:
if TYPE_CHECKING: # pragma: no cover
from bec_qthemes._main import AccentColors

View File

@@ -19,7 +19,7 @@ from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.error_popups import ErrorPopupUtility
if TYPE_CHECKING:
if TYPE_CHECKING: # pragma: no cover
from bec_lib import messages
from qtpy.QtCore import QObject
else:

View File

@@ -59,7 +59,7 @@ class SidePanel(QWidget):
self.main_layout.setContentsMargins(0, 0, 0, 0)
self.main_layout.setSpacing(0)
self.toolbar = ModularToolBar(target_widget=self, orientation="vertical")
self.toolbar = ModularToolBar(parent=self, target_widget=self, orientation="vertical")
self.container = QWidget()
self.container.layout = QVBoxLayout(self.container)
@@ -89,7 +89,7 @@ class SidePanel(QWidget):
self.main_layout.setContentsMargins(0, 0, 0, 0)
self.main_layout.setSpacing(0)
self.toolbar = ModularToolBar(target_widget=self, orientation="horizontal")
self.toolbar = ModularToolBar(parent=self, target_widget=self, orientation="horizontal")
self.container = QWidget()
self.container.layout = QVBoxLayout(self.container)

View File

@@ -521,7 +521,7 @@ class ModularToolBar(QToolBar):
orientation: Literal["horizontal", "vertical"] = "horizontal",
background_color: str = "rgba(0, 0, 0, 0)",
):
super().__init__(parent)
super().__init__(parent=parent)
self.widgets = defaultdict(dict)
self.background_color = background_color

View File

@@ -15,7 +15,7 @@ from bec_widgets.utils.container_utils import WidgetContainerUtils
logger = bec_logger.logger
if TYPE_CHECKING:
if TYPE_CHECKING: # pragma: no cover
from qtpy.QtWidgets import QWidget
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea

View File

@@ -102,7 +102,7 @@ class BECDockArea(BECWidget, QWidget):
self._instructions_visible = True
self.dark_mode_button = DarkModeButton(parent=self, parent_id=self.gui_id, toolbar=True)
self.dock_area = DockArea()
self.dock_area = DockArea(parent=self)
self.toolbar = ModularToolBar(
parent=self,
actions={
@@ -440,12 +440,6 @@ class BECDockArea(BECWidget, QWidget):
Cleanup the dock area.
"""
self.delete_all()
self.toolbar.close()
self.toolbar.deleteLater()
self.dark_mode_button.close()
self.dark_mode_button.deleteLater()
self.dock_area.close()
self.dock_area.deleteLater()
super().cleanup()
def show(self):

View File

@@ -11,7 +11,7 @@ class AbortButton(BECWidget, QWidget):
PLUGIN = True
ICON_NAME = "cancel"
RPC = False
RPC = True
def __init__(
self,

View File

@@ -11,7 +11,7 @@ class ResetButton(BECWidget, QWidget):
PLUGIN = True
ICON_NAME = "restart_alt"
RPC = False
RPC = True
def __init__(self, parent=None, client=None, config=None, gui_id=None, toolbar=False, **kwargs):
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)

View File

@@ -11,7 +11,7 @@ class ResumeButton(BECWidget, QWidget):
PLUGIN = True
ICON_NAME = "resume"
RPC = False
RPC = True
def __init__(self, parent=None, client=None, config=None, gui_id=None, toolbar=False, **kwargs):
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)

View File

@@ -11,7 +11,7 @@ class StopButton(BECWidget, QWidget):
PLUGIN = True
ICON_NAME = "dangerous"
RPC = False
RPC = True
def __init__(self, parent=None, client=None, config=None, gui_id=None, toolbar=False, **kwargs):
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)

View File

@@ -31,6 +31,7 @@ class PositionerBox(PositionerBoxBase):
dimensions = (234, 224)
PLUGIN = True
RPC = True
USER_ACCESS = ["set_positioner"]
device_changed = Signal(str, str)

View File

@@ -33,6 +33,7 @@ class PositionerBox2D(PositionerBoxBase):
ui_file = "positioner_box_2d.ui"
PLUGIN = True
RPC = True
USER_ACCESS = ["set_positioner_hor", "set_positioner_ver"]
device_changed_hor = Signal(str, str)

View File

@@ -33,7 +33,7 @@ from bec_widgets.widgets.editors.scan_metadata._util import (
field_precision,
)
if TYPE_CHECKING:
if TYPE_CHECKING: # pragma: no cover
from pydantic.fields import FieldInfo
logger = bec_logger.logger

View File

@@ -9,7 +9,7 @@ from annotated_types import Ge, Gt, Le, Lt
from bec_lib.logger import bec_logger
from pydantic_core import PydanticUndefined
if TYPE_CHECKING:
if TYPE_CHECKING: # pragma: no cover
from pydantic.fields import FieldInfo
logger = bec_logger.logger

View File

@@ -29,7 +29,7 @@ from bec_widgets.widgets.editors.scan_metadata.additional_metadata_table import
AdditionalMetadataTable,
)
if TYPE_CHECKING:
if TYPE_CHECKING: # pragma: no cover
from pydantic.fields import FieldInfo
logger = bec_logger.logger

View File

@@ -117,6 +117,7 @@ class WebsiteWidget(BECWidget, QWidget):
Cleanup the widget
"""
self.website.page().deleteLater()
super().cleanup()
if __name__ == "__main__":

View File

@@ -144,7 +144,7 @@ class Minesweeper(BECWidget, QWidget):
PLUGIN = True
ICON_NAME = "videogame_asset"
USER_ACCESS = []
RPC = False
RPC = True
def __init__(self, parent=None, *args, **kwargs):
super().__init__(parent=parent, *args, **kwargs)

View File

@@ -101,7 +101,7 @@ class PlotBase(BECWidget, QWidget):
self.plot_item = pg.PlotItem(viewBox=BECViewBox(enableMenu=True))
self.plot_widget.addItem(self.plot_item)
self.side_panel = SidePanel(self, orientation="left", panel_max_width=280)
self.toolbar = ModularToolBar(target_widget=self, orientation="horizontal")
self.toolbar = ModularToolBar(parent=self, target_widget=self, orientation="horizontal")
self._init_toolbar()
# PlotItem Addons

View File

@@ -386,7 +386,7 @@ class CurveTree(BECWidget, QWidget):
def _init_toolbar(self):
"""Initialize the toolbar with actions: add, send, refresh, expand, collapse, renormalize."""
self.toolbar = ModularToolBar(target_widget=self, orientation="horizontal")
self.toolbar = ModularToolBar(parent=self, target_widget=self, orientation="horizontal")
add = MaterialIconAction(
icon_name="add", tooltip="Add new curve", checkable=False, parent=self
)

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import json
import traceback
from typing import Literal
import lmfit
@@ -1012,6 +1013,7 @@ class Waveform(PlotBase):
self.auto_range_y = True
self.old_scan_id = self.scan_id
self.scan_id = current_scan_id
# What if there is not yet a scan item here
self.scan_item = self.queue.scan_storage.find_scan_by_ID(self.scan_id) # live scan
self._slice_index = None # Reset the slice index
@@ -1162,10 +1164,12 @@ class Waveform(PlotBase):
Args:
curve(Curve): The curve to set up.
"""
logger.warning(f"SETUP ASYNC CURVE {curve.name()} for scan ID {self.scan_id}")
name = curve.config.signal.name
self.bec_dispatcher.disconnect_slot(
self.on_async_readback, MessageEndpoints.device_async_readback(self.old_scan_id, name)
)
QApplication.processEvents() # Process events to avoid async callbacks scheduled but executed in the wrong order
try:
curve.clear_data()
except KeyError:
@@ -1174,7 +1178,8 @@ class Waveform(PlotBase):
self.bec_dispatcher.connect_slot(
self.on_async_readback,
MessageEndpoints.device_async_readback(self.scan_id, name),
from_start=True,
cb_info={"scan_id": self.scan_id},
# from_start=True,
)
logger.info(f"Setup async curve {name}")
@@ -1197,6 +1202,15 @@ class Waveform(PlotBase):
msg(dict): Message with the async data.
metadata(dict): Metadata of the message.
"""
sender = self.sender()
logger.info(f"Update from sender {sender}")
if sender and hasattr(sender, "cb_info"):
scan_id = sender.cb_info.get("scan_id", None)
logger.info(
f"Update from sender {sender} with scan ID {scan_id} for curve {self.scan_id}"
)
if scan_id != self.scan_id:
return # Ignore messages from other scans
instruction = metadata.get("async_update", {}).get("type")
if instruction not in ["add", "add_slice", "replace"]:
logger.warning(f"Invalid async update instruction: {instruction}")
@@ -1205,6 +1219,7 @@ class Waveform(PlotBase):
plot_mode = self.x_axis_mode["name"]
for curve in self._async_curves:
x_data = None # Reset x_data
y_data = None
# Get the curve data
async_data = msg["signals"].get(curve.config.signal.entry, None)
if async_data is None:
@@ -1223,6 +1238,7 @@ class Waveform(PlotBase):
data_plot_y = data_plot_y[-1, :]
else:
x_data, y_data = curve.get_data()
if y_data is not None:
data_plot_y = np.hstack((y_data, data_plot_y))
# Add slice
@@ -1533,7 +1549,11 @@ class Waveform(PlotBase):
Categorise the device curves into sync and async based on the readout priority.
"""
if self.scan_item is None:
self.update_with_scan_history(-1)
# If scan_item is None, than update_with_scan_history will call `_categorise_device_curves` via
# `_emit_signal_update`. All of this happens while old_scan_id is still set. So we actually register
# a callback twice for the same scan_id. This is a problem since data will be duplicated.
# self.update_with_scan_history(-1)
if self.scan_item is None:
logger.info("No scan executed so far; skipping device curves categorisation.")
return "none"

View File

@@ -21,6 +21,7 @@ class BECProgressBar(BECWidget, QWidget):
"set_minimum",
"label_template",
"label_template.setter",
"_get_label",
]
ICON_NAME = "page_control"
@@ -79,6 +80,10 @@ class BECProgressBar(BECWidget, QWidget):
"""
return self._label_template
def _get_label(self) -> str:
"""Return the label text. mostly used for testing rpc."""
return self.center_label.text()
@label_template.setter
def label_template(self, template):
self._label_template = template

View File

@@ -76,6 +76,7 @@ class BECQueue(BECWidget, CompactPopupWidget):
widget_label = QLabel("Live Queue")
widget_label.setStyleSheet("font-weight: bold;")
self.toolbar = ModularToolBar(
parent=self,
actions={
"widget_label": WidgetAction(widget=widget_label),
"separator_1": SeparatorAction(),

View File

@@ -16,7 +16,7 @@ from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.compact_popup import CompactPopupWidget
from bec_widgets.widgets.services.bec_status_box.status_item import StatusItem
if TYPE_CHECKING:
if TYPE_CHECKING: # pragma: no cover
from bec_lib.client import BECClient
# TODO : Put normal imports back when Pydantic gets faster
@@ -76,6 +76,7 @@ class BECStatusBox(BECWidget, CompactPopupWidget):
PLUGIN = True
CORE_SERVICES = ["DeviceServer", "ScanServer", "SciHub", "ScanBundler", "FileWriterManager"]
USER_ACCESS = ["get_server_state"]
service_update = Signal(BECServiceInfoContainer)
bec_core_state = Signal(str)
@@ -299,6 +300,10 @@ class BECStatusBox(BECWidget, CompactPopupWidget):
if objects["item"] == item:
objects["widget"].show_popup()
def get_server_state(self) -> str:
"""Get the state ("RUNNING", "BUSY", "IDLE", "ERROR") of the BEC server"""
return self.status_container[self.box_name]["info"].status
def cleanup(self):
"""Cleanup the BECStatusBox widget."""
self.bec_service_status.cleanup()

View File

@@ -7,7 +7,7 @@ from qtpy.QtCore import QMimeData, Qt
from qtpy.QtGui import QDrag
from qtpy.QtWidgets import QApplication, QHBoxLayout, QLabel, QWidget
if TYPE_CHECKING:
if TYPE_CHECKING: # pragma: no cover
from qtpy.QtGui import QMouseEvent
logger = bec_logger.logger

View File

@@ -8,7 +8,7 @@ import re
from collections import deque
from functools import partial, reduce
from re import Pattern
from typing import Literal
from typing import TYPE_CHECKING, Literal
from bec_lib.client import BECClient
from bec_lib.connector import ConnectorBase
@@ -52,6 +52,9 @@ from bec_widgets.widgets.utility.logpanel._util import (
simple_color_format,
)
if TYPE_CHECKING: # pragma: no cover
from PySide6.QtCore import SignalInstance
logger = bec_logger.logger
MODULE_PATH = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
@@ -396,6 +399,8 @@ class LogPanel(TextBox):
_new_messages = Signal()
service_list_update = Signal(dict, set)
USER_ACCESS = [] # Overwrite TextBox USER_ACCESS
def __init__(
self,
parent=None,

View File

@@ -12,7 +12,7 @@ class DarkModeButton(BECWidget, QWidget):
ICON_NAME = "dark_mode"
PLUGIN = True
RPC = False
RPC = True
def __init__(
self,

View File

@@ -3,6 +3,10 @@
import random
import pytest
from bec_lib.client import BECClient
from bec_lib.redis_connector import RedisConnector
from bec_lib.service_config import ServiceConfig
from bec_lib.tests.utils import wait_for_empty_queue
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
from bec_widgets.utils import BECDispatcher
@@ -28,30 +32,64 @@ def gui_id():
return f"figure_{random.randint(0,100)}" # make a new gui id each time, to ensure no 'gui is alive' zombie key can perturbate
@pytest.fixture
def connected_client_gui_obj(gui_id, bec_client_lib):
@pytest.fixture(scope="function")
def connected_client_gui_obj(qtbot, gui_id, bec_client_lib):
"""
Fixture to create a new BECGuiClient object and start a server in the background.
This fixture should be used if a new gui instance is needed for each test.
"""
def _check_gui_has_bec():
return hasattr(gui, "bec") and gui.bec is not None
gui = BECGuiClient(gui_id=gui_id)
try:
gui.start(wait=True)
qtbot.waitUntil(_check_gui_has_bec, timeout=7000)
yield gui
finally:
gui.kill_server()
@pytest.fixture(scope="session")
def connected_gui_with_scope_session(gui_id, bec_client_lib):
def bec_client_lib_with_demo_config_session(
bec_redis_fixture, bec_services_config_file_path, bec_servers
):
"""Session-scoped fixture to create a BECClient object with a demo configuration."""
config = ServiceConfig(bec_services_config_file_path)
bec = BECClient(config, RedisConnector, forced=True, wait_for_server=True)
bec.start()
bec.config.load_demo_config()
try:
yield bec
finally:
bec.shutdown()
@pytest.fixture(scope="session")
def bec_client_lib_session(bec_client_lib_with_demo_config_session):
"""Session-scoped fixture to create a BECClient object with a demo configuration."""
bec = bec_client_lib_with_demo_config_session
bec.queue.request_queue_reset()
bec.queue.request_scan_continuation()
wait_for_empty_queue(bec)
yield bec
@pytest.fixture(scope="session")
def connected_gui_and_bec_with_scope_session(bec_client_lib_session):
"""
Fixture to create a new BECGuiClient object and start a server in the background.
This fixture is scoped to the session, meaning it remains alive for all tests in the session.
We can use this fixture to create a gui object that is used across multiple tests, and
simulate a real-world scenario where the gui is not restarted for each test.
Returns:
The gui object as for the CLI and bec_client_lib object.
"""
gui_id = "GUIMainWindow_TEST"
gui = BECGuiClient(gui_id=gui_id)
try:
gui.start(wait=True)

View File

@@ -3,11 +3,14 @@ import time
import numpy as np
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_widgets.cli.client import Image, MotorMap, MultiWaveform, ScatterWaveform, Waveform
from bec_widgets.cli.rpc.rpc_base import RPCReference
from bec_widgets.tests.utils import check_remote_data_size
logger = bec_logger.logger
def test_rpc_waveform1d_custom_curve(qtbot, connected_client_gui_obj):
gui = connected_client_gui_obj
@@ -111,6 +114,7 @@ def test_rpc_waveform_scan(qtbot, bec_client_lib, connected_client_gui_obj):
def test_async_plotting(qtbot, bec_client_lib, connected_client_gui_obj):
logger.warning("Starting async plotting test in e2e test.")
gui = connected_client_gui_obj
dock = gui.bec
@@ -121,9 +125,9 @@ def test_async_plotting(qtbot, bec_client_lib, connected_client_gui_obj):
# Test add
dev.waveform.sim.select_model("GaussianModel")
dev.waveform.sim.params = {"amplitude": 1000, "center": 4000, "sigma": 300}
dev.waveform.sim.params = {"amplitude": 1000, "center": 400, "sigma": 300}
dev.waveform.async_update.put("add")
dev.waveform.waveform_shape.put(10000)
dev.waveform.waveform_shape.put(1000) # Do not reduce, data needs to be large to downsample
wf = dock.new("wf_dock").new("Waveform")
curve = wf.plot(y_name="waveform")
@@ -131,28 +135,24 @@ def test_async_plotting(qtbot, bec_client_lib, connected_client_gui_obj):
status.wait()
# Wait for the scan to finish and the data to be available in history
# Wait until scan_id is in history
def _wait_for_scan_in_history():
if len(client.history) == 0:
return False
# Once items appear in storage, the last one hast to be the one we just scanned
return client.history[-1].metadata.bec["scan_id"] == status.scan.scan_id
# Get scan item from history
scan_item = client.history.get_by_scan_id(status.scan.scan_id)
return scan_item is not None
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# Get all data
waveform_data = client.history[-1].devices.waveform.waveform_waveform.read()["value"]
qtbot.waitUntil(_wait_for_scan_in_history, timeout=10000)
last_scan_data = client.history[-1]
# check plotted data
x_data, y_data = curve.get_data()
assert np.array_equal(x_data, np.linspace(0, len(y_data) - 1, len(y_data)))
assert np.array_equal(
y_data, last_scan_data.devices.waveform.get("waveform_waveform", {}).read().get("value", [])
)
assert np.array_equal(y_data, waveform_data)
# Check displayed data
x_data_display, y_data_display = curve._get_displayed_data()
# Should be not more than 1% difference, actually be closer but this might be flaky
assert np.isclose(x_data_display[-1], x_data[-1], rtol=0.01)
# Downsampled data should be smaller than original data
assert len(y_data_display) < len(y_data)
def test_rpc_image(qtbot, bec_client_lib, connected_client_gui_obj):

View File

@@ -0,0 +1,717 @@
"""
End-to-end tests single gui instance across the full session.
Each test will use the same gui instance, simulating a real-world scenario where the gui is not
restarted for each test. The interaction is tested through the rpc calls.
Note: wait_for_namespace_created is a utility method that helps to wait for the namespace to be
created in the gui. This is necessary because the rpc calls are asynchronous and the namespace
may not be created immediately after the rpc call is made.
"""
import random
from typing import Generator
import numpy as np
import pytest
from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCReference
PYTEST_TIMEOUT = 50
# pylint: disable=redefined-outer-name
# pylint: disable=too-many-arguments
# pylint: disable=protected-access
# pylint: disable=unused-variable
def wait_for_namespace_change(
qtbot,
gui: RPCBase,
parent_widget: RPCBase | RPCReference,
widget_name: str,
widget_gui_id: str,
timeout: int = 10000,
exists: bool = True,
):
"""Utility method to wait for the namespace to be created in the widget."""
# GUI object is not registered in the registry (yet)
if parent_widget is gui:
def check_reference_registered():
# Check that the widget is in ipython registry
obj = gui._ipython_registry.get(widget_gui_id, None)
if obj is None:
if not exists:
return True
return False
# _rpc_references do not exist on BECGuiClient class somehow..
else:
def check_reference_registered():
obj = gui._ipython_registry.get(widget_gui_id, None)
if obj is None:
if not exists:
return True
return False
ref = parent_widget._rpc_references.get(widget_gui_id, None)
if exists:
return ref is not None
return ref is None
try:
qtbot.waitUntil(check_reference_registered, timeout=timeout)
except Exception as e:
raise RuntimeError(
f"Timeout waiting for {parent_widget.widget_name}.{widget_name} to be created."
) from e
@pytest.fixture(scope="session")
def random_generator_from_seed(request):
"""Fixture to get a random seed for the following tests."""
seed = request.config.getoption("--random-order-seed").split(":")[-1]
try:
seed = int(seed)
except ValueError: # Should not be required...
seed = 42
rng = random.Random(seed)
yield rng
# @pytest.fixture(scope="session")
# def random_generator_from_seed(random_number_gen):
# for val in random_number_gen:
# yield val
def create_widget(
qtbot, gui: RPCBase, widget_cls_name: str
) -> tuple[RPCReference, RPCReference, RPCReference]:
"""Utility method to create a widget and wait for the namespaces to be created."""
dock_area = gui.new()
wait_for_namespace_change(qtbot, gui, gui, dock_area.widget_name, dock_area._gui_id)
dock = dock_area.new(widget=widget_cls_name)
wait_for_namespace_change(qtbot, gui, dock_area, dock.widget_name, dock._gui_id)
widget = dock.element_list[-1]
wait_for_namespace_change(qtbot, gui, dock, widget.widget_name, widget._gui_id)
return dock_area, dock, widget
def maybe_remove_widget(
qtbot, gui: RPCBase, dock: RPCReference, widget: RPCReference, random_int_gen: random.Random
):
"""Utility method to remove the widget with a 50% chance."""
random_int = random_int_gen.randint(0, 100)
if random_int >= 50:
# Needed, reference gets deleted in the gui
name = widget.widget_name
gui_id = widget._gui_id
dock.delete(widget.widget_name)
wait_for_namespace_change(qtbot, gui, dock, name, gui_id, exists=False)
def maybe_remove_dock_area(
qtbot, gui: RPCBase, dock_area: RPCReference, random_int_gen: random.Random
):
"""Utility method to remove the dock area with a 50% chance."""
random_int = random_int_gen.randint(0, 100)
if random_int >= 50:
# Needed, reference gets deleted in the gui
name = dock_area.widget_name
gui_id = dock_area._gui_id
gui.delete(dock_area.widget_name)
wait_for_namespace_change(qtbot, gui, gui, name, gui_id, exists=False)
@pytest.mark.timeout(100)
def test_all_available_widgets(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""This test simply checks that all widgets that are available via gui.available_widgets can be created and removed."""
gui = connected_gui_and_bec_with_scope_session
for widget_name in gui.available_widgets.__dict__:
# Skip private attributes
if widget_name.startswith("_"):
continue
if widget_name == "VSCodeEditor":
continue # Not installed in docker image for CI, so we skip it.
dock_area, dock, widget = create_widget(
qtbot, gui, getattr(gui.available_widgets, widget_name)
)
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_abort_button(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the AbortButton widget."""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.AbortButton)
# No rpc calls to check so far
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_bec_progress_bar(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the BECProgressBar widget."""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.BECProgressBar)
# Check rpc calls
assert widget.label_template == "$value / $maximum - $percentage %"
widget.set_maximum(100)
widget.set_minimum(50)
widget.set_value(75)
assert widget._get_label() == "75 / 100 - 50 %"
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_bec_queue(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the BECQueue widget."""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.BECQueue)
# No rpc calls to test so far
# maybe we can add an rpc call to check the queue length
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_bec_status_box(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the BECStatusBox widget."""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.BECStatusBox)
# Check rpc calls
assert widget.get_server_state() in ["RUNNING", "IDLE", "BUSY", "ERROR"]
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_dap_combo_box(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the DAPComboBox widget."""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.DapComboBox)
# Check rpc calls
widget.select_fit_model("PseudoVoigtModel")
widget.select_x_axis("samx")
widget.select_y_axis("bpm4i")
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_device_browser(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the DeviceBrowser widget."""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.DeviceBrowser)
# No rpc calls yet to check
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_device_combo_box(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the DeviceComboBox widget."""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.DeviceComboBox)
# No rpc calls to check so far, maybe set_device should be exposed
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_device_line_edit(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the DeviceLineEdit widget."""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.DeviceLineEdit)
# No rpc calls to check so far
# Should probably have a set_device method
# No rpc calls to check so far, maybe set_device should be exposed
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_image(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the Image widget."""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.Image)
scans = bec.scans
dev = bec.device_manager.devices
# Test rpc calls
img = widget.image(dev.eiger)
assert img.get_data() is None
# Run a scan and plot the image
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
s.wait()
def _wait_for_scan_in_history():
# Get scan item from history
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
return scan_item is not None
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# Check that last image is equivalent to data in Redis
last_img = bec.device_monitor.get_data(
dev.eiger, count=1
) # Get last image from Redis monitor 2D endpoint
assert np.allclose(img.get_data(), last_img)
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_log_panel(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the LogPanel widget."""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.LogPanel)
# No rpc calls to check so far
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_minesweeper(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the MineSweeper widget."""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.Minesweeper)
# No rpc calls to check so far
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_motor_map(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the MotorMap widget."""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.MotorMap)
# Test RPC calls
dev = bec.device_manager.devices
scans = bec.scans
# Set motor map to names
widget.map(dev.samx, dev.samy)
# Move motor samx to pos
pos = dev.samx.limits[1] - 1 # -1 from higher limit
scans.mv(dev.samx, pos, relative=False).wait()
# Check that data is up to date
assert np.isclose(widget.get_data()["x"][-1], pos, dev.samx.precision)
# Move motor samy to pos
pos = dev.samy.limits[0] + 1 # +1 from lower limit
scans.mv(dev.samy, pos, relative=False).wait()
# Check that data is up to date
assert np.isclose(widget.get_data()["y"][-1], pos, dev.samy.precision)
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_multi_waveform(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test MultiWaveform widget."""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.MultiWaveform)
# Test RPC calls
dev = bec.device_manager.devices
scans = bec.scans
# test plotting
cm = "cividis"
widget.plot(dev.waveform, color_palette=cm)
assert widget.monitor == dev.waveform.name
assert widget.color_palette == cm
# Scan with BEC
s = scans.line_scan(dev.samx, -3, 3, steps=5, exp_time=0.01, relative=False)
s.wait()
def _wait_for_scan_in_history():
# Get scan item from history
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
return scan_item is not None
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# Wait for data in history (should be plotted?)
# TODO how can we check that the data was plotted, implement get_data()
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_indicator(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the PositionIndicator widget."""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.PositionIndicator)
# TODO check what these rpc calls are supposed to do! Issue created #461
widget.set_value(5)
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_box(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the PositionerBox widget."""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox)
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
# No rpc calls to check so far
widget.set_positioner(dev.samx)
widget.set_positioner(dev.samy.name)
scans.mv(dev.samy, -3, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_box_2d(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the PositionerBox2D widget."""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox2D)
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
# No rpc calls to check so far
widget.set_positioner_hor(dev.samx)
widget.set_positioner_ver(dev.samy)
# Try moving the motors
scans.mv(dev.samx, 3, relative=False).wait()
scans.mv(dev.samy, -3, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_control_line(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the positioner control line widget"""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.PositionerControlLine)
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
# Set positioner
widget.set_positioner(dev.samx)
scans.mv(dev.samx, 3, relative=False).wait()
widget.set_positioner(dev.samy.name)
scans.mv(dev.samy, -3, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_ring_progress_bar(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the RingProgressBar widget"""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.RingProgressBar)
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
# Do a scan
scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_scan_control(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the ScanControl widget"""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.ScanControl)
# No rpc calls to check so far
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_scatter_waveform(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the ScatterWaveform widget"""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.ScatterWaveform)
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
widget.plot(dev.samx, dev.samy, dev.bpm4i)
scans.grid_scan(dev.samx, -5, 5, 5, dev.samy, -5, 5, 5, exp_time=0.01, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_stop_button(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the StopButton widget"""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.StopButton)
# No rpc calls to check so far
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_resume_button(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the StopButton widget"""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.ResumeButton)
# No rpc calls to check so far
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_reset_button(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the StopButton widget"""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.ResetButton)
# No rpc calls to check so far
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_text_box(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the TextBox widget"""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.TextBox)
# RPC calls
widget.set_plain_text("Hello World")
widget.set_html_text("<b> Hello World HTML </b>")
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_waveform(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the Waveform widget"""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.Waveform)
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
widget.plot(dev.bpm4i)
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
s.wait()
def _wait_for_scan_in_history():
# Get scan item from history
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
return scan_item is not None
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
samx_data = scan_item.devices.samx.samx.read()["value"]
bpm4i_data = scan_item.devices.bpm4i.bpm4i.read()["value"]
curve = widget.curves[0]
assert np.allclose(curve.get_data()[0], samx_data)
assert np.allclose(curve.get_data()[1], bpm4i_data)
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_website_widget(
qtbot, connected_gui_and_bec_with_scope_session, random_generator_from_seed
):
"""Test the WebsiteWidget widget"""
gui = connected_gui_and_bec_with_scope_session
bec = gui._client
# Create dock_area, dock, widget
dock_area, dock, widget = create_widget(qtbot, gui, gui.available_widgets.WebsiteWidget)
# Test rpc calls, maybe add private method to get current url
# widget.set_url("dummy_url")
# widget.set_url("next_dummy_url")
# # Check url
# widget.back()
# # Check url
# widget.forward()
# Check url
# Test removing the widget, or leaving it open for the next test
maybe_remove_widget(qtbot, gui, dock, widget, random_generator_from_seed)
maybe_remove_dock_area(qtbot, gui, dock_area, random_generator_from_seed)

View File

@@ -8,7 +8,7 @@ from bec_widgets.widgets.services.device_browser.device_browser import DeviceBro
from .client_mocks import mocked_client
if TYPE_CHECKING:
if TYPE_CHECKING: # pragma: no cover
from qtpy.QtWidgets import QListWidgetItem
from bec_widgets.widgets.services.device_browser import DeviceItem