Compare commits

...

11 Commits

106 changed files with 2298 additions and 1540 deletions
+4
View File
@@ -17,6 +17,10 @@ on:
required: false
type: string
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
permissions:
pull-requests: write
+5 -5
View File
@@ -2,15 +2,15 @@ from __future__ import annotations
from bec_lib import bec_logger
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
logger = bec_logger.logger
def dock_area(
object_name: str | None = None, profile: str | None = None, start_empty: bool = False
) -> AdvancedDockArea:
) -> BECDockArea:
"""
Create an advanced dock area using Qt Advanced Docking System.
@@ -20,7 +20,7 @@ def dock_area(
start_empty(bool): If True, start with an empty dock area when loading specified profile.
Returns:
AdvancedDockArea: The created advanced dock area.
BECDockArea: The created advanced dock area.
Note:
The "general" profile is mandatory and will always exist. If manually deleted,
@@ -29,7 +29,7 @@ def dock_area(
# Default to "general" profile when called from CLI without specifying a profile
effective_profile = profile if profile is not None else "general"
widget = AdvancedDockArea(
widget = BECDockArea(
object_name=object_name,
restore_initial_profile=True,
root_widget=True,
@@ -51,7 +51,7 @@ def auto_update_dock_area(object_name: str | None = None) -> AutoUpdates:
object_name(str): The name of the dock area.
Returns:
AdvancedDockArea: The created dock area.
BECDockArea: The created dock area.
"""
_auto_update = AutoUpdates(object_name=object_name)
return _auto_update
+43 -52
View File
@@ -27,14 +27,12 @@ from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.name_utils import pascal_to_snake
from bec_widgets.utils.plugin_utils import get_plugin_auto_updates
from bec_widgets.utils.round_frame import RoundedFrame
from bec_widgets.utils.screen_utils import apply_window_geometry, centered_geometry_for_app
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.utils.ui_loader import UILoader
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.containers.advanced_dock_area.profile_utils import (
get_last_profile,
list_profiles,
)
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.dock_area.profile_utils import get_last_profile, list_profiles
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow, BECMainWindowNoRPC
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
@@ -78,23 +76,28 @@ class LaunchTile(RoundedFrame):
circular_pixmap.fill(Qt.transparent)
painter = QPainter(circular_pixmap)
painter.setRenderHints(QPainter.Antialiasing, True)
painter.setRenderHints(QPainter.RenderHint.Antialiasing, True)
path = QPainterPath()
path.addEllipse(0, 0, size, size)
painter.setClipPath(path)
pixmap = pixmap.scaled(size, size, Qt.KeepAspectRatio, Qt.SmoothTransformation)
pixmap = pixmap.scaled(
size,
size,
Qt.AspectRatioMode.KeepAspectRatio,
Qt.TransformationMode.SmoothTransformation,
)
painter.drawPixmap(0, 0, pixmap)
painter.end()
self.icon_label.setPixmap(circular_pixmap)
self.layout.addWidget(self.icon_label, alignment=Qt.AlignCenter)
self.layout.addWidget(self.icon_label, alignment=Qt.AlignmentFlag.AlignCenter)
# Top label
self.top_label = QLabel(top_label.upper())
font_top = self.top_label.font()
font_top.setPointSize(10)
self.top_label.setFont(font_top)
self.layout.addWidget(self.top_label, alignment=Qt.AlignCenter)
self.layout.addWidget(self.top_label, alignment=Qt.AlignmentFlag.AlignCenter)
# Main label
self.main_label = QLabel(main_label)
@@ -104,7 +107,7 @@ class LaunchTile(RoundedFrame):
font_main.setPointSize(14)
font_main.setBold(True)
self.main_label.setFont(font_main)
self.main_label.setAlignment(Qt.AlignCenter)
self.main_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
# Shrink font if the default would wrap on this platform / DPI
content_width = (
@@ -120,13 +123,13 @@ class LaunchTile(RoundedFrame):
self.layout.addWidget(self.main_label)
self.spacer_top = QSpacerItem(0, 10, QSizePolicy.Fixed, QSizePolicy.Fixed)
self.spacer_top = QSpacerItem(0, 10, QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed)
self.layout.addItem(self.spacer_top)
# Description
self.description_label = QLabel(description)
self.description_label.setWordWrap(True)
self.description_label.setAlignment(Qt.AlignCenter)
self.description_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.layout.addWidget(self.description_label)
# Selector
@@ -136,7 +139,9 @@ class LaunchTile(RoundedFrame):
else:
self.selector = None
self.spacer_bottom = QSpacerItem(0, 0, QSizePolicy.Fixed, QSizePolicy.Expanding)
self.spacer_bottom = QSpacerItem(
0, 0, QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Expanding
)
self.layout.addItem(self.spacer_bottom)
# Action button
@@ -156,7 +161,7 @@ class LaunchTile(RoundedFrame):
}
"""
)
self.layout.addWidget(self.action_button, alignment=Qt.AlignCenter)
self.layout.addWidget(self.action_button, alignment=Qt.AlignmentFlag.AlignCenter)
def _fit_label_to_width(self, label: QLabel, max_width: int, min_pt: int = 10):
"""
@@ -179,12 +184,13 @@ class LaunchTile(RoundedFrame):
metrics = QFontMetrics(font)
label.setFont(font)
label.setWordWrap(False)
label.setText(metrics.elidedText(label.text(), Qt.ElideRight, max_width))
label.setText(metrics.elidedText(label.text(), Qt.TextElideMode.ElideRight, max_width))
class LaunchWindow(BECMainWindow):
RPC = True
TILE_SIZE = (250, 300)
DEFAULT_LAUNCH_SIZE = (800, 600)
USER_ACCESS = ["show_launcher", "hide_launcher"]
def __init__(
@@ -209,7 +215,7 @@ class LaunchWindow(BECMainWindow):
self.toolbar = ModularToolBar(parent=self)
self.addToolBar(Qt.TopToolBarArea, self.toolbar)
self.spacer = QWidget(self)
self.spacer.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.spacer.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
self.toolbar.addWidget(self.spacer)
self.toolbar.addWidget(self.dark_mode_button)
@@ -318,7 +324,7 @@ class LaunchWindow(BECMainWindow):
)
tile.setFixedWidth(self.TILE_SIZE[0])
tile.setMinimumHeight(self.TILE_SIZE[1])
tile.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.MinimumExpanding)
tile.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.MinimumExpanding)
if action_button:
tile.action_button.clicked.connect(action_button)
if show_selector and selector_items:
@@ -428,7 +434,9 @@ class LaunchWindow(BECMainWindow):
from bec_widgets.applications import bw_launch
with RPCRegister.delayed_broadcast() as rpc_register:
existing_dock_areas = rpc_register.get_names_of_rpc_by_class_type(AdvancedDockArea)
if geometry is None and launch_script != "custom_ui_file":
geometry = self._default_launch_geometry()
existing_dock_areas = rpc_register.get_names_of_rpc_by_class_type(BECDockArea)
if name is not None:
WidgetContainerUtils.raise_for_invalid_name(name)
# If name already exists, generate a unique one with counter suffix
@@ -451,13 +459,13 @@ class LaunchWindow(BECMainWindow):
if launch_script == "auto_update":
auto_update = kwargs.pop("auto_update", None)
return self._launch_auto_update(auto_update)
return self._launch_auto_update(auto_update, geometry=geometry)
if launch_script == "widget":
widget = kwargs.pop("widget", None)
if widget is None:
raise ValueError("Widget name must be provided.")
return self._launch_widget(widget)
return self._launch_widget(widget, geometry=geometry)
launch = getattr(bw_launch, launch_script, None)
if launch is None:
@@ -469,13 +477,13 @@ class LaunchWindow(BECMainWindow):
logger.info(f"Created new dock area: {name}")
if isinstance(result_widget, BECMainWindow):
self._apply_window_geometry(result_widget, geometry)
apply_window_geometry(result_widget, geometry)
result_widget.show()
else:
window = BECMainWindowNoRPC()
window.setCentralWidget(result_widget)
window.setWindowTitle(f"BEC - {result_widget.objectName()}")
self._apply_window_geometry(window, geometry)
apply_window_geometry(window, geometry)
window.show()
return result_widget
@@ -511,12 +519,14 @@ class LaunchWindow(BECMainWindow):
window.setCentralWidget(loaded)
window.setWindowTitle(f"BEC - {filename}")
self._apply_window_geometry(window, None)
apply_window_geometry(window, None)
window.show()
logger.info(f"Launched custom UI: {filename}, type: {type(window).__name__}")
return window
def _launch_auto_update(self, auto_update: str) -> AutoUpdates:
def _launch_auto_update(
self, auto_update: str, geometry: tuple[int, int, int, int] | None = None
) -> AutoUpdates:
if auto_update in self.available_auto_updates:
auto_update_cls = self.available_auto_updates[auto_update]
window = auto_update_cls()
@@ -527,11 +537,13 @@ class LaunchWindow(BECMainWindow):
window.resize(window.minimumSizeHint())
window.setWindowTitle(f"BEC - {window.objectName()}")
self._apply_window_geometry(window, None)
apply_window_geometry(window, geometry)
window.show()
return window
def _launch_widget(self, widget: type[BECWidget]) -> QWidget:
def _launch_widget(
self, widget: type[BECWidget], geometry: tuple[int, int, int, int] | None = None
) -> QWidget:
name = pascal_to_snake(widget.__name__)
WidgetContainerUtils.raise_for_invalid_name(name)
@@ -544,7 +556,7 @@ class LaunchWindow(BECMainWindow):
window.setCentralWidget(widget_instance)
window.resize(window.minimumSizeHint())
window.setWindowTitle(f"BEC - {widget_instance.objectName()}")
self._apply_window_geometry(window, None)
apply_window_geometry(window, geometry)
window.show()
return window
@@ -592,30 +604,9 @@ class LaunchWindow(BECMainWindow):
raise ValueError(f"Widget {widget} not found in available widgets.")
return self.launch("widget", widget=self.available_widgets[widget])
def _apply_window_geometry(
self, window: QWidget, geometry: tuple[int, int, int, int] | None
) -> None:
"""Apply a provided geometry or center the window with an 80% layout."""
if geometry is not None:
window.setGeometry(*geometry)
return
default_geometry = self._default_window_geometry(window)
if default_geometry is not None:
window.setGeometry(*default_geometry)
else:
window.resize(window.minimumSizeHint())
@staticmethod
def _default_window_geometry(window: QWidget) -> tuple[int, int, int, int] | None:
screen = window.screen() or QApplication.primaryScreen()
if screen is None:
return None
available = screen.availableGeometry()
width = int(available.width() * 0.8)
height = int(available.height() * 0.8)
x = available.x() + (available.width() - width) // 2
y = available.y() + (available.height() - height) // 2
return x, y, width, height
def _default_launch_geometry(self) -> tuple[int, int, int, int] | None:
width, height = self.DEFAULT_LAUNCH_SIZE
return centered_geometry_for_app(width=width, height=height)
@SafeSlot(popup_error=True)
def _open_custom_ui_file(self):
@@ -706,7 +697,7 @@ class LaunchWindow(BECMainWindow):
self.hide()
if __name__ == "__main__":
if __name__ == "__main__": # pragma: no cover
import sys
from bec_widgets.utils.colors import apply_theme
+13 -21
View File
@@ -7,7 +7,12 @@ from bec_widgets.applications.views.developer_view.developer_view import Develop
from bec_widgets.applications.views.device_manager_view.device_manager_view import DeviceManagerView
from bec_widgets.applications.views.view import ViewBase, WaveformViewInline, WaveformViewPopup
from bec_widgets.utils.colors import apply_theme
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.utils.screen_utils import (
apply_centered_size,
available_screen_geometry,
main_app_size_for_screen,
)
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
@@ -45,7 +50,7 @@ class BECMainApp(BECMainWindow):
def _add_views(self):
self.add_section("BEC Applications", "bec_apps")
self.ads = AdvancedDockArea(self, profile_namespace="bec", auto_profile_namespace=False)
self.ads = BECDockArea(self, profile_namespace="bec", auto_profile_namespace=False)
self.ads.setObjectName("MainWorkspace")
self.device_manager = DeviceManagerView(self)
self.developer_view = DeveloperView(self)
@@ -211,25 +216,12 @@ def main(): # pragma: no cover
apply_theme("dark")
w = BECMainApp(show_examples=args.examples)
screen = app.primaryScreen()
screen_geometry = screen.availableGeometry()
screen_width = screen_geometry.width()
screen_height = screen_geometry.height()
# 70% of screen height, keep 16:9 ratio
height = int(screen_height * 0.9)
width = int(height * (16 / 9))
# If width exceeds screen width, scale down
if width > screen_width * 0.9:
width = int(screen_width * 0.9)
height = int(width / (16 / 9))
w.resize(width, height)
# Center the window on the screen
x = screen_geometry.x() + (screen_geometry.width() - width) // 2
y = screen_geometry.y() + (screen_geometry.height() - height) // 2
w.move(x, y)
screen_geometry = available_screen_geometry()
if screen_geometry is not None:
width, height = main_app_size_for_screen(screen_geometry)
apply_centered_size(w, width, height, available=screen_geometry)
else:
w.resize(w.minimumSizeHint())
w.show()
@@ -13,8 +13,8 @@ from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import MaterialIconAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.qt_ads import CDockWidget
from bec_widgets.widgets.editors.monaco.monaco_dock import MonacoDock
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget
@@ -99,7 +99,7 @@ class DeveloperWidget(DockAreaWidget):
self.monaco = MonacoDock(self)
self.monaco.setObjectName("MonacoEditor")
self.monaco.save_enabled.connect(self._on_save_enabled_update)
self.plotting_ads = AdvancedDockArea(
self.plotting_ads = BECDockArea(
self,
mode="plot",
default_add_direction="bottom",
@@ -38,7 +38,7 @@ from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import MaterialIconAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.control.device_manager.components import (
DeviceTable,
DMConfigView,
+81 -74
View File
@@ -56,7 +56,6 @@ _Widgets = {
"ScatterWaveform": "ScatterWaveform",
"SignalLabel": "SignalLabel",
"TextBox": "TextBox",
"VSCodeEditor": "VSCodeEditor",
"Waveform": "Waveform",
"WebConsole": "WebConsole",
"WebsiteWidget": "WebsiteWidget",
@@ -91,7 +90,63 @@ except ImportError as e:
logger.error(f"Failed loading plugins: \n{reduce(add, traceback.format_exception(e))}")
class AdvancedDockArea(RPCBase):
class AutoUpdates(RPCBase):
@property
@rpc_call
def enabled(self) -> "bool":
"""
Get the enabled status of the auto updates.
"""
@enabled.setter
@rpc_call
def enabled(self) -> "bool":
"""
Get the enabled status of the auto updates.
"""
@property
@rpc_call
def selected_device(self) -> "str | None":
"""
Get the selected device from the auto update config.
Returns:
str: The selected device. If no device is selected, None is returned.
"""
@selected_device.setter
@rpc_call
def selected_device(self) -> "str | None":
"""
Get the selected device from the auto update config.
Returns:
str: The selected device. If no device is selected, None is returned.
"""
class AvailableDeviceResources(RPCBase):
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
"""
@rpc_call
def attach(self):
"""
None
"""
@rpc_call
def detach(self):
"""
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
"""
class BECDockArea(RPCBase):
@rpc_call
def new(
self,
@@ -321,62 +376,6 @@ class AdvancedDockArea(RPCBase):
"""
class AutoUpdates(RPCBase):
@property
@rpc_call
def enabled(self) -> "bool":
"""
Get the enabled status of the auto updates.
"""
@enabled.setter
@rpc_call
def enabled(self) -> "bool":
"""
Get the enabled status of the auto updates.
"""
@property
@rpc_call
def selected_device(self) -> "str | None":
"""
Get the selected device from the auto update config.
Returns:
str: The selected device. If no device is selected, None is returned.
"""
@selected_device.setter
@rpc_call
def selected_device(self) -> "str | None":
"""
Get the selected device from the auto update config.
Returns:
str: The selected device. If no device is selected, None is returned.
"""
class AvailableDeviceResources(RPCBase):
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
"""
@rpc_call
def attach(self):
"""
None
"""
@rpc_call
def detach(self):
"""
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
"""
class BECMainWindow(RPCBase):
@rpc_call
def remove(self):
@@ -2502,16 +2501,30 @@ class Image(RPCBase):
@property
@rpc_call
def monitor(self) -> "str":
def device_name(self) -> "str":
"""
The name of the monitor to use for the image.
The name of the device to monitor for image data.
"""
@monitor.setter
@device_name.setter
@rpc_call
def monitor(self) -> "str":
def device_name(self) -> "str":
"""
The name of the monitor to use for the image.
The name of the device to monitor for image data.
"""
@property
@rpc_call
def device_entry(self) -> "str":
"""
The signal/entry name to monitor on the device.
"""
@device_entry.setter
@rpc_call
def device_entry(self) -> "str":
"""
The signal/entry name to monitor on the device.
"""
@rpc_call
@@ -2617,8 +2630,8 @@ class Image(RPCBase):
@rpc_call
def image(
self,
monitor: "str | tuple | None" = None,
monitor_type: "Literal['auto', '1d', '2d']" = "auto",
device_name: "str | None" = None,
device_entry: "str | None" = None,
color_map: "str | None" = None,
color_bar: "Literal['simple', 'full'] | None" = None,
vrange: "tuple[int, int] | None" = None,
@@ -2627,14 +2640,14 @@ class Image(RPCBase):
Set the image source and update the image.
Args:
monitor(str|tuple|None): The name of the monitor to use for the image, or a tuple of (device, signal) for preview signals. If None or empty string, the current monitor will be disconnected.
monitor_type(str): The type of monitor to use. Options are "1d", "2d", or "auto".
device_name(str|None): The name of the device to monitor. If None or empty string, the current monitor will be disconnected.
device_entry(str|None): The signal/entry name to monitor on the device.
color_map(str): The color map to use for the image.
color_bar(str): The type of color bar to use. Options are "simple" or "full".
vrange(tuple): The range of values to use for the color map.
Returns:
ImageItem: The image object.
ImageItem: The image object, or None if connection failed.
"""
@property
@@ -5515,12 +5528,6 @@ class TextBox(RPCBase):
"""
class VSCodeEditor(RPCBase):
"""A widget to display the VSCode editor."""
...
class Waveform(RPCBase):
"""Widget for plotting waveforms."""
+170 -1
View File
@@ -1,10 +1,19 @@
# pylint: skip-file
import json
import time
from unittest.mock import MagicMock
import h5py
from bec_lib import messages
from bec_lib.bec_service import messages
from bec_lib.config_helper import ConfigHelper
from bec_lib.device import Device as BECDevice
from bec_lib.device import Positioner as BECPositioner
from bec_lib.device import ReadoutPriority
from bec_lib.devicemanager import DeviceContainer
from bec_lib.messages import _StoredDataInfo
from bec_lib.scan_history import ScanHistory
from qtpy.QtCore import QEvent, QEventLoop
class FakeDevice(BECDevice):
@@ -219,7 +228,9 @@ class Device(FakeDevice):
class DMMock:
def __init__(self):
def __init__(self, *args, **kwargs):
self._service = args[0]
self.config_helper = ConfigHelper(self._service.connector, self._service._service_name)
self.devices = DeviceContainer()
self.enabled_devices = [device for device in self.devices if device.enabled]
@@ -273,6 +284,10 @@ class DMMock:
configs.append(device._config)
return configs
def initialize(*_): ...
def shutdown(self): ...
DEVICES = [
FakePositioner("samx", limits=[-10, 10], read_value=2.0),
@@ -301,3 +316,157 @@ def check_remote_data_size(widget, plot_name, num_elements):
Used in the qtbot.waitUntil function.
"""
return len(widget.get_all_data()[plot_name]["x"]) == num_elements
class DummyData:
def __init__(self, val, timestamps):
self.val = val
self.timestamps = timestamps
def get(self, key, default=None):
if key == "val":
return self.val
return default
def create_dummy_scan_item():
"""
Helper to create a dummy scan item with both live_data and metadata/status_message info.
"""
dummy_live_data = {
"samx": {"samx": DummyData(val=[10, 20, 30], timestamps=[100, 200, 300])},
"samy": {"samy": DummyData(val=[5, 10, 15], timestamps=[100, 200, 300])},
"bpm4i": {"bpm4i": DummyData(val=[5, 6, 7], timestamps=[101, 201, 301])},
"async_device": {"async_device": DummyData(val=[1, 2, 3], timestamps=[11, 21, 31])},
}
dummy_scan = MagicMock()
dummy_scan.live_data = dummy_live_data
dummy_scan.metadata = {
"bec": {
"scan_id": "dummy",
"scan_report_devices": ["samx"],
"readout_priority": {"monitored": ["bpm4i"], "async": ["async_device"]},
}
}
dummy_scan.status_message.info = {
"readout_priority": {"monitored": ["bpm4i"], "async": ["async_device"]},
"scan_report_devices": ["samx"],
}
return dummy_scan
def inject_scan_history(widget, scan_history_factory, *history_args):
"""
Helper to inject scan history messages into client history.
"""
history_msgs = []
for scan_id, scan_number in history_args:
history_msgs.append(scan_history_factory(scan_id=scan_id, scan_number=scan_number))
widget.client.history = ScanHistory(widget.client, False)
for msg in history_msgs:
widget.client.history._scan_data[msg.scan_id] = msg
widget.client.history._scan_ids.append(msg.scan_id)
widget.client.queue.scan_storage.current_scan = None
return history_msgs
def create_history_file(file_path, data: dict, metadata: dict) -> messages.ScanHistoryMessage:
"""
Helper to create a history file with the given data.
The data should contain readout groups, e.g.
{
"baseline": {"samx": {"samx": {"value": [1, 2, 3], "timestamp": [100, 200, 300]}},
"monitored": {"bpm4i": {"bpm4i": {"value": [5, 6, 7], "timestamp": [101, 201, 301]}}},
"async": {"async_device": {"async_device": {"value": [1, 2, 3], "timestamp": [11, 21, 31]}}},
}
"""
with h5py.File(file_path, "w") as f:
_metadata = f.create_group("entry/collection/metadata")
_metadata.create_dataset("sample_name", data="test_sample")
metadata_bec = f.create_group("entry/collection/metadata/bec")
for key, value in metadata.items():
if isinstance(value, dict):
metadata_bec.create_group(key)
for sub_key, sub_value in value.items():
if isinstance(sub_value, list):
sub_value = json.dumps(sub_value)
metadata_bec[key].create_dataset(sub_key, data=sub_value)
elif isinstance(sub_value, dict):
for sub_sub_key, sub_sub_value in sub_value.items():
sub_sub_group = metadata_bec[key].create_group(sub_key)
# Handle _StoredDataInfo objects
if isinstance(sub_sub_value, _StoredDataInfo):
# Store the numeric shape
sub_sub_group.create_dataset("shape", data=sub_sub_value.shape)
# Store the dtype as a UTF-8 string
dt = sub_sub_value.dtype or ""
sub_sub_group.create_dataset(
"dtype", data=dt, dtype=h5py.string_dtype(encoding="utf-8")
)
continue
if isinstance(sub_sub_value, list):
json_val = json.dumps(sub_sub_value)
sub_sub_group.create_dataset(sub_sub_key, data=json_val)
elif isinstance(sub_sub_value, dict):
for k2, v2 in sub_sub_value.items():
val = json.dumps(v2) if isinstance(v2, list) else v2
sub_sub_group.create_dataset(k2, data=val)
else:
sub_sub_group.create_dataset(sub_sub_key, data=sub_sub_value)
else:
metadata_bec[key].create_dataset(sub_key, data=sub_value)
else:
metadata_bec.create_dataset(key, data=value)
for group, devices in data.items():
readout_group = f.create_group(f"entry/collection/readout_groups/{group}")
for device, device_data in devices.items():
dev_group = f.create_group(f"entry/collection/devices/{device}")
for signal, signal_data in device_data.items():
signal_group = dev_group.create_group(signal)
for signal_key, signal_values in signal_data.items():
signal_group.create_dataset(signal_key, data=signal_values)
readout_group[device] = h5py.SoftLink(f"/entry/collection/devices/{device}")
msg = messages.ScanHistoryMessage(
scan_id=metadata["scan_id"],
scan_name=metadata["scan_name"],
exit_status=metadata["exit_status"],
file_path=file_path,
scan_number=metadata["scan_number"],
dataset_number=metadata["dataset_number"],
start_time=time.time(),
end_time=time.time(),
num_points=metadata["num_points"],
request_inputs=metadata["request_inputs"],
stored_data_info=metadata.get("stored_data_info"),
metadata={"scan_report_devices": metadata.get("scan_report_devices")},
)
return msg
def create_widget(qtbot, widget, *args, **kwargs):
"""
Create a widget and add it to the qtbot for testing. This is a helper function that
should be used in all tests that require a widget to be created.
Args:
qtbot (fixture): pytest-qt fixture
widget (QWidget): widget class to be created
*args: positional arguments for the widget
**kwargs: keyword arguments for the widget
Returns:
QWidget: the created widget
"""
widget = widget(*args, **kwargs)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
return widget
def process_all_deferred_deletes(qapp):
qapp.sendPostedEvents(None, QEvent.Type.DeferredDelete)
qapp.processEvents(QEventLoop.ProcessEventsFlag.AllEvents)
+5 -6
View File
@@ -123,17 +123,16 @@ class BECDispatcher:
self._registered_slots: DefaultDict[Hashable, QtThreadSafeCallback] = (
collections.defaultdict()
)
self.client = client
if self.client is None:
if config is not None:
if not isinstance(config, ServiceConfig):
# config is supposed to be a path
config = ServiceConfig(config)
if client is None:
if config is not None and not isinstance(config, ServiceConfig):
# config is supposed to be a path
config = ServiceConfig(config)
self.client = BECClient(
config=config, connector_cls=QtRedisConnector, name="BECWidgets"
)
else:
self.client = client
if self.client.started:
# have to reinitialize client to use proper connector
logger.info("Shutting down BECClient to switch to QtRedisConnector")
+105 -10
View File
@@ -1,6 +1,7 @@
from __future__ import annotations
import re
from functools import lru_cache
from typing import Literal
import numpy as np
@@ -9,6 +10,7 @@ from bec_lib import bec_logger
from bec_qthemes import apply_theme as apply_theme_global
from bec_qthemes._theme import AccentColors
from pydantic_core import PydanticCustomError
from pyqtgraph.graphicsItems.GradientEditorItem import Gradients
from qtpy.QtCore import QEvent, QEventLoop
from qtpy.QtGui import QColor
from qtpy.QtWidgets import QApplication
@@ -57,6 +59,96 @@ def apply_theme(theme: Literal["dark", "light"]):
class Colors:
@staticmethod
def list_available_colormaps() -> list[str]:
"""
List colormap names available via the pyqtgraph colormap registry.
Note: This does not include `GradientEditorItem` presets (used by HistogramLUT menus).
"""
def _list(source: str | None = None) -> list[str]:
try:
return pg.colormap.listMaps() if source is None else pg.colormap.listMaps(source)
except Exception: # pragma: no cover - backend may be missing
return []
return [*_list(None), *_list("matplotlib"), *_list("colorcet")]
@staticmethod
def list_available_gradient_presets() -> list[str]:
"""
List `GradientEditorItem` preset names (HistogramLUT right-click menu entries).
"""
from pyqtgraph.graphicsItems.GradientEditorItem import Gradients
return list(Gradients.keys())
@staticmethod
def canonical_colormap_name(color_map: str) -> str:
"""
Return an available colormap/preset name if a case-insensitive match exists.
"""
requested = (color_map or "").strip()
if not requested:
return requested
registry = Colors.list_available_colormaps()
presets = Colors.list_available_gradient_presets()
available = set(registry) | set(presets)
if requested in available:
return requested
# Case-insensitive match.
requested_lc = requested.casefold()
for name in available:
if name.casefold() == requested_lc:
return name
return requested
@staticmethod
def get_colormap(color_map: str) -> pg.ColorMap:
"""
Resolve a string into a `pg.ColorMap` using either:
- the `pg.colormap` registry (optionally including matplotlib/colorcet backends), or
- `GradientEditorItem` presets (HistogramLUT right-click menu).
"""
name = Colors.canonical_colormap_name(color_map)
if not name:
raise ValueError("Empty colormap name")
return Colors._get_colormap_cached(name)
@staticmethod
@lru_cache(maxsize=256)
def _get_colormap_cached(name: str) -> pg.ColorMap:
# 1) Registry/backends
try:
cmap = pg.colormap.get(name)
if cmap is not None:
return cmap
except Exception:
pass
for source in ("matplotlib", "colorcet"):
try:
cmap = pg.colormap.get(name, source=source)
if cmap is not None:
return cmap
except Exception:
continue
# 2) Presets -> ColorMap
if name not in Gradients:
raise KeyError(f"Colormap '{name}' not found")
ge = pg.GradientEditorItem()
ge.loadPreset(name)
return ge.colorMap()
@staticmethod
def golden_ratio(num: int) -> list:
@@ -138,7 +230,7 @@ class Colors:
if theme_offset < 0 or theme_offset > 1:
raise ValueError("theme_offset must be between 0 and 1")
cmap = pg.colormap.get(colormap)
cmap = Colors.get_colormap(colormap)
min_pos, max_pos = Colors.set_theme_offset(theme, theme_offset)
# Generate positions that are evenly spaced within the acceptable range
@@ -186,7 +278,7 @@ class Colors:
ValueError: If theme_offset is not between 0 and 1.
"""
cmap = pg.colormap.get(colormap)
cmap = Colors.get_colormap(colormap)
phi = (1 + np.sqrt(5)) / 2 # Golden ratio
golden_angle_conjugate = 1 - (1 / phi) # Approximately 0.38196601125
@@ -452,21 +544,24 @@ class Colors:
Raises:
PydanticCustomError: If colormap is invalid.
"""
available_pg_maps = pg.colormap.listMaps()
available_mpl_maps = pg.colormap.listMaps("matplotlib")
available_mpl_colorcet = pg.colormap.listMaps("colorcet")
available_colormaps = available_pg_maps + available_mpl_maps + available_mpl_colorcet
if color_map not in available_colormaps:
normalized = Colors.canonical_colormap_name(color_map)
try:
Colors.get_colormap(normalized)
except Exception as ext:
logger.warning(f"Colormap validation error: {ext}")
if return_error:
available_colormaps = sorted(
set(Colors.list_available_colormaps())
| set(Colors.list_available_gradient_presets())
)
raise PydanticCustomError(
"unsupported colormap",
f"Colormap '{color_map}' not found in the current installation of pyqtgraph. Choose on the following: {available_colormaps}.",
f"Colormap '{color_map}' not found in the current installation of pyqtgraph. Choose from the following: {available_colormaps}.",
{"wrong_value": color_map},
)
else:
return False
return color_map
return normalized
@staticmethod
def relative_luminance(color: QColor) -> float:
+100
View File
@@ -0,0 +1,100 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from qtpy.QtWidgets import QApplication, QWidget
if TYPE_CHECKING: # pragma: no cover
from qtpy.QtCore import QRect
def available_screen_geometry(*, widget: QWidget | None = None) -> QRect | None:
"""
Get the available geometry of the screen associated with the given widget or application.
Args:
widget(QWidget | None): The widget to get the screen from.
Returns:
QRect | None: The available geometry of the screen, or None if no screen is found.
"""
screen = widget.screen() if widget is not None else None
if screen is None:
app = QApplication.instance()
screen = app.primaryScreen() if app is not None else None
if screen is None:
return None
return screen.availableGeometry()
def centered_geometry(available: "QRect", width: int, height: int) -> tuple[int, int, int, int]:
"""
Calculate centered geometry within the available rectangle.
Args:
available(QRect): The available rectangle to center within.
width(int): The desired width.
height(int): The desired height.
Returns:
tuple[int, int, int, int]: The (x, y, width, height) of the centered geometry.
"""
x = available.x() + (available.width() - width) // 2
y = available.y() + (available.height() - height) // 2
return x, y, width, height
def centered_geometry_for_app(width: int, height: int) -> tuple[int, int, int, int] | None:
available = available_screen_geometry()
if available is None:
return None
return centered_geometry(available, width, height)
def scaled_centered_geometry_for_window(
window: QWidget, *, width_ratio: float = 0.8, height_ratio: float = 0.8
) -> tuple[int, int, int, int] | None:
available = available_screen_geometry(widget=window)
if available is None:
return None
width = int(available.width() * width_ratio)
height = int(available.height() * height_ratio)
return centered_geometry(available, width, height)
def apply_window_geometry(
window: QWidget,
geometry: tuple[int, int, int, int] | None,
*,
width_ratio: float = 0.8,
height_ratio: float = 0.8,
) -> None:
if geometry is not None:
window.setGeometry(*geometry)
return
default_geometry = scaled_centered_geometry_for_window(
window, width_ratio=width_ratio, height_ratio=height_ratio
)
if default_geometry is not None:
window.setGeometry(*default_geometry)
else:
window.resize(window.minimumSizeHint())
def main_app_size_for_screen(available: "QRect") -> tuple[int, int]:
height = int(available.height() * 0.9)
width = int(height * (16 / 9))
if width > available.width() * 0.9:
width = int(available.width() * 0.9)
height = int(width / (16 / 9))
return width, height
def apply_centered_size(
window: QWidget, width: int, height: int, *, available: "QRect" | None = None
) -> None:
if available is None:
available = available_screen_geometry(widget=window)
if available is None:
window.resize(width, height)
return
window.setGeometry(*centered_geometry(available, width, height))
@@ -7,7 +7,7 @@ from bec_lib.logger import bec_logger
from bec_lib.messages import ScanStatusMessage
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
from bec_widgets.widgets.containers.qt_ads import CDockWidget
@@ -37,7 +37,7 @@ class AutoUpdates(BECMainWindow):
):
super().__init__(parent=parent, gui_id=gui_id, window_title=window_title, **kwargs)
self.dock_area = AdvancedDockArea(
self.dock_area = BECDockArea(
parent=self,
object_name="dock_area",
enable_profile_management=False,
@@ -5,7 +5,7 @@ from typing import Literal, Mapping, Sequence
import slugify
from bec_lib import bec_logger
from qtpy.QtCore import QTimer, Signal
from qtpy.QtCore import Signal
from qtpy.QtGui import QPixmap
from qtpy.QtWidgets import (
QApplication,
@@ -31,8 +31,8 @@ from bec_widgets.utils.toolbars.actions import (
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.utils.widget_state_manager import WidgetStateManager
from bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.advanced_dock_area.profile_utils import (
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.dock_area.profile_utils import (
SETTINGS_KEYS,
default_profile_candidates,
delete_profile_files,
@@ -55,14 +55,12 @@ from bec_widgets.widgets.containers.advanced_dock_area.profile_utils import (
user_profile_candidates,
write_manifest,
)
from bec_widgets.widgets.containers.advanced_dock_area.settings.dialogs import (
from bec_widgets.widgets.containers.dock_area.settings.dialogs import (
RestoreProfileDialog,
SaveProfileDialog,
)
from bec_widgets.widgets.containers.advanced_dock_area.settings.workspace_manager import (
WorkSpaceManager,
)
from bec_widgets.widgets.containers.advanced_dock_area.toolbar_components.workspace_actions import (
from bec_widgets.widgets.containers.dock_area.settings.workspace_manager import WorkSpaceManager
from bec_widgets.widgets.containers.dock_area.toolbar_components.workspace_actions import (
WorkspaceConnection,
workspace_bundle,
)
@@ -90,7 +88,7 @@ _PROFILE_NAMESPACE_UNSET = object()
PROFILE_STATE_KEYS = {key: SETTINGS_KEYS[key] for key in ("geom", "state", "ads_state")}
class AdvancedDockArea(DockAreaWidget):
class BECDockArea(DockAreaWidget):
RPC = True
PLUGIN = False
USER_ACCESS = [
@@ -1163,7 +1161,7 @@ if __name__ == "__main__": # pragma: no cover
dispatcher = BECDispatcher(gui_id="ads")
window = BECMainWindowNoRPC()
ads = AdvancedDockArea(mode="creator", enable_profile_management=True, root_widget=True)
ads = BECDockArea(mode="creator", enable_profile_management=True, root_widget=True)
window.setCentralWidget(ads)
window.show()
@@ -28,7 +28,7 @@ from qtpy.QtWidgets import (
from bec_widgets import BECWidget, SafeSlot
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.widgets.containers.advanced_dock_area.profile_utils import (
from bec_widgets.widgets.containers.dock_area.profile_utils import (
get_profile_info,
is_quick_select,
list_profiles,
@@ -10,7 +10,7 @@ from bec_widgets import SafeSlot
from bec_widgets.utils.toolbars.actions import MaterialIconAction, WidgetAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle, ToolbarComponents
from bec_widgets.utils.toolbars.connections import BundleConnection
from bec_widgets.widgets.containers.advanced_dock_area.profile_utils import list_quick_profiles
from bec_widgets.widgets.containers.dock_area.profile_utils import list_quick_profiles
class ProfileComboBox(QComboBox):
@@ -186,6 +186,11 @@ class DeviceComboBox(DeviceInputBase, QComboBox):
device = self.itemData(idx)[0] # type: ignore[assignment]
return super().validate_device(device)
@property
def is_valid_input(self) -> bool:
"""Whether the current text represents a valid device selection."""
return self._is_valid_input
if __name__ == "__main__": # pragma: no cover
# pylint: disable=import-outside-toplevel
@@ -69,11 +69,12 @@ class DeviceTest(QtCore.QRunnable):
enable_connect: bool,
force_connect: bool,
timeout: float,
device_manager_ds: object | None = None,
):
super().__init__()
self.uuid = device_model.uuid
test_config = {device_model.device_name: device_model.device_config}
self.tester = StaticDeviceTest(config_dict=test_config)
self.tester = StaticDeviceTest(config_dict=test_config, device_manager_ds=device_manager_ds)
self.signals = DeviceTestResult()
self.device_config = device_model.device_config
self.enable_connect = enable_connect
@@ -752,11 +753,15 @@ class OphydValidation(BECWidget, QtWidgets.QWidget):
# Remove widget from list as it's safe to assume it can be loaded.
self._remove_device_config(widget.device_model.device_config)
return
dm_ds = None
if self.client:
dm_ds = getattr(self.client, "device_manager", None)
runnable = DeviceTest(
device_model=widget.device_model,
enable_connect=connect,
force_connect=force_connect,
timeout=timeout,
device_manager_ds=dm_ds,
)
widget.validation_scheduled()
if self.thread_pool_manager:
@@ -9,7 +9,7 @@ from bec_lib.macro_update_handler import has_executable_code
from qtpy.QtCore import QEvent, QTimer, Signal
from qtpy.QtWidgets import QFileDialog, QMessageBox, QToolButton, QWidget
from bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.qt_ads import CDockAreaWidget, CDockWidget
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget
@@ -1,15 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.editors.vscode.vs_code_editor_plugin import VSCodeEditorPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(VSCodeEditorPlugin())
if __name__ == "__main__": # pragma: no cover
main()
@@ -1 +0,0 @@
{'files': ['vscode.py']}
@@ -1,57 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from qtpy.QtWidgets import QWidget
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.editors.vscode.vscode import VSCodeEditor
DOM_XML = """
<ui language='c++'>
<widget class='VSCodeEditor' name='vs_code_editor'>
</widget>
</ui>
"""
class VSCodeEditorPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
if parent is None:
return QWidget()
t = VSCodeEditor(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return "BEC Developer"
def icon(self):
return designer_material_icon(VSCodeEditor.ICON_NAME)
def includeFile(self):
return "vs_code_editor"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "VSCodeEditor"
def toolTip(self):
return ""
def whatsThis(self):
return self.toolTip()
@@ -1,203 +0,0 @@
import os
import select
import shlex
import signal
import socket
import subprocess
from typing import Literal
from pydantic import BaseModel
from qtpy.QtCore import Signal, Slot
from bec_widgets.widgets.editors.website.website import WebsiteWidget
class VSCodeInstructionMessage(BaseModel):
command: Literal["open", "write", "close", "zenMode", "save", "new", "setCursor"]
content: str = ""
def get_free_port():
"""
Get a free port on the local machine.
Returns:
int: The free port number
"""
sock = socket.socket()
sock.bind(("", 0))
port = sock.getsockname()[1]
sock.close()
return port
class VSCodeEditor(WebsiteWidget):
"""
A widget to display the VSCode editor.
"""
file_saved = Signal(str)
token = "bec"
host = "127.0.0.1"
PLUGIN = True
USER_ACCESS = []
ICON_NAME = "developer_mode_tv"
def __init__(self, parent=None, config=None, client=None, gui_id=None, **kwargs):
self.process = None
self.port = get_free_port()
self._url = f"http://{self.host}:{self.port}?tkn={self.token}"
super().__init__(parent=parent, config=config, client=client, gui_id=gui_id, **kwargs)
self.start_server()
self.bec_dispatcher.connect_slot(self.on_vscode_event, f"vscode-events/{self.gui_id}")
def start_server(self):
"""
Start the server.
This method starts the server for the VSCode editor in a subprocess.
"""
env = os.environ.copy()
env["BEC_Widgets_GUIID"] = self.gui_id
env["BEC_REDIS_HOST"] = self.client.connector.host
cmd = shlex.split(
f"code serve-web --port {self.port} --connection-token={self.token} --accept-server-license-terms"
)
self.process = subprocess.Popen(
cmd,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
preexec_fn=os.setsid,
env=env,
)
os.set_blocking(self.process.stdout.fileno(), False)
while self.process.poll() is None:
readylist, _, _ = select.select([self.process.stdout], [], [], 1)
if self.process.stdout in readylist:
output = self.process.stdout.read(1024)
if output and f"available at {self._url}" in output:
break
self.set_url(self._url)
self.wait_until_loaded()
@Slot(str)
def open_file(self, file_path: str):
"""
Open a file in the VSCode editor.
Args:
file_path: The file path to open
"""
msg = VSCodeInstructionMessage(command="open", content=f"file://{file_path}")
self.client.connector.raw_send(f"vscode-instructions/{self.gui_id}", msg.model_dump_json())
@Slot(dict, dict)
def on_vscode_event(self, content, _metadata):
"""
Handle the VSCode event. VSCode events are received as RawMessages.
Args:
content: The content of the event
metadata: The metadata of the event
"""
# the message also contains the content but I think is fine for now to just emit the file path
if not isinstance(content["data"], dict):
return
if "uri" not in content["data"]:
return
if not content["data"]["uri"].startswith("file://"):
return
file_path = content["data"]["uri"].split("file://")[1]
self.file_saved.emit(file_path)
@Slot()
def save_file(self):
"""
Save the file in the VSCode editor.
"""
msg = VSCodeInstructionMessage(command="save")
self.client.connector.raw_send(f"vscode-instructions/{self.gui_id}", msg.model_dump_json())
@Slot()
def new_file(self):
"""
Create a new file in the VSCode editor.
"""
msg = VSCodeInstructionMessage(command="new")
self.client.connector.raw_send(f"vscode-instructions/{self.gui_id}", msg.model_dump_json())
@Slot()
def close_file(self):
"""
Close the file in the VSCode editor.
"""
msg = VSCodeInstructionMessage(command="close")
self.client.connector.raw_send(f"vscode-instructions/{self.gui_id}", msg.model_dump_json())
@Slot(str)
def write_file(self, content: str):
"""
Write content to the file in the VSCode editor.
Args:
content: The content to write
"""
msg = VSCodeInstructionMessage(command="write", content=content)
self.client.connector.raw_send(f"vscode-instructions/{self.gui_id}", msg.model_dump_json())
@Slot()
def zen_mode(self):
"""
Toggle the Zen mode in the VSCode editor.
"""
msg = VSCodeInstructionMessage(command="zenMode")
self.client.connector.raw_send(f"vscode-instructions/{self.gui_id}", msg.model_dump_json())
@Slot(int, int)
def set_cursor(self, line: int, column: int):
"""
Set the cursor in the VSCode editor.
Args:
line: The line number
column: The column number
"""
msg = VSCodeInstructionMessage(command="setCursor", content=f"{line},{column}")
self.client.connector.raw_send(f"vscode-instructions/{self.gui_id}", msg.model_dump_json())
def cleanup_vscode(self):
"""
Cleanup the VSCode editor.
"""
if not self.process or self.process.poll() is not None:
return
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
self.process.wait()
def cleanup(self):
"""
Cleanup the widget. This method is called from the dock area when the widget is removed.
"""
self.bec_dispatcher.disconnect_slot(self.on_vscode_event, f"vscode-events/{self.gui_id}")
self.cleanup_vscode()
return super().cleanup()
if __name__ == "__main__": # pragma: no cover
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
widget = VSCodeEditor(gui_id="unknown")
widget.show()
app.exec_()
widget.bec_dispatcher.disconnect_all()
widget.client.shutdown()
File diff suppressed because it is too large Load Diff
+47 -10
View File
@@ -9,6 +9,7 @@ from pydantic import BaseModel, ConfigDict, Field, ValidationError
from qtpy.QtCore import QPointF, Signal, SignalInstance
from qtpy.QtWidgets import QDialog, QVBoxLayout
from bec_widgets.utils import Colors
from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.utils.side_panel import SidePanel
@@ -131,8 +132,9 @@ class ImageLayerManager:
image.setZValue(z_position)
image.removed.connect(self._remove_destroyed_layer)
# FIXME: For now, we hard-code the default color map here. In the future, this should be configurable.
image.color_map = "plasma"
color_map = getattr(getattr(self.parent, "config", None), "color_map", None)
if color_map:
image.color_map = color_map
self.layers[name] = ImageLayer(name=name, image=image, sync=sync)
self.plot_item.addItem(image)
@@ -249,6 +251,8 @@ class ImageBase(PlotBase):
Base class for the Image widget.
"""
MAX_TICKS_COLORBAR = 10
sync_colorbar_with_autorange = Signal()
image_updated = Signal()
layer_added = Signal(str)
@@ -460,18 +464,20 @@ class ImageBase(PlotBase):
self.setProperty("autorange", False)
if style == "simple":
self._color_bar = pg.ColorBarItem(colorMap=self.config.color_map)
cmap = Colors.get_colormap(self.config.color_map)
self._color_bar = pg.ColorBarItem(colorMap=cmap)
self._color_bar.setImageItem(self.layer_manager["main"].image)
self._color_bar.sigLevelsChangeFinished.connect(disable_autorange)
self.config.color_bar = "simple"
elif style == "full":
self._color_bar = pg.HistogramLUTItem()
self._color_bar.setImageItem(self.layer_manager["main"].image)
self._color_bar.gradient.loadPreset(self.config.color_map)
self.config.color_bar = "full"
self._apply_colormap_to_colorbar(self.config.color_map)
self._color_bar.sigLevelsChanged.connect(disable_autorange)
self.plot_widget.addItem(self._color_bar, row=0, col=1)
self.config.color_bar = style
else:
if self._color_bar:
self.plot_widget.removeItem(self._color_bar)
@@ -484,6 +490,37 @@ class ImageBase(PlotBase):
if vrange: # should be at the end to disable the autorange if defined
self.v_range = vrange
def _apply_colormap_to_colorbar(self, color_map: str) -> None:
if not self._color_bar:
return
cmap = Colors.get_colormap(color_map)
if self.config.color_bar == "simple":
self._color_bar.setColorMap(cmap)
return
if self.config.color_bar != "full":
return
gradient = getattr(self._color_bar, "gradient", None)
if gradient is None:
return
positions = np.linspace(0.0, 1.0, self.MAX_TICKS_COLORBAR)
colors = cmap.map(positions, mode="byte")
colors = np.asarray(colors)
if colors.ndim != 2:
return
if colors.shape[1] == 3: # add alpha
alpha = np.full((colors.shape[0], 1), 255, dtype=colors.dtype)
colors = np.concatenate([colors, alpha], axis=1)
ticks = [(float(p), tuple(int(x) for x in c)) for p, c in zip(positions, colors)]
state = {"mode": "rgb", "ticks": ticks}
gradient.restoreState(state)
################################################################################
# Static rois with roi manager
@@ -754,11 +791,11 @@ class ImageBase(PlotBase):
layer.image.color_map = value
if self._color_bar:
if self.config.color_bar == "simple":
self._color_bar.setColorMap(value)
elif self.config.color_bar == "full":
self._color_bar.gradient.loadPreset(value)
except ValidationError:
self._apply_colormap_to_colorbar(self.config.color_map)
except ValidationError as exc:
logger.warning(
f"Colormap '{value}' is not available; keeping '{self.config.color_map}'. {exc}"
)
return
@SafeProperty("QPointF")
@@ -119,7 +119,8 @@ class ImageItem(BECConnector, pg.ImageItem):
"""Set a new color map."""
try:
self.config.color_map = value
self.setColorMap(value)
cmap = Colors.get_colormap(self.config.color_map)
self.setColorMap(cmap)
except ValidationError:
logger.error(f"Invalid colormap '{value}' provided.")
@@ -0,0 +1,255 @@
from qtpy.QtWidgets import QHBoxLayout, QSizePolicy, QWidget
from bec_widgets.utils.toolbars.actions import WidgetAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle, ToolbarComponents
from bec_widgets.utils.toolbars.connections import BundleConnection
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
class DeviceSelection(QWidget):
"""Device and signal selection widget for image toolbar."""
def __init__(self, parent=None, client=None):
super().__init__(parent=parent)
self.client = client
self.supported_signals = [
"PreviewSignal",
"AsyncSignal",
"AsyncMultiSignal",
"DynamicSignal",
]
# Create device combobox with signal class filter
# This will only show devices that have signals matching the supported signal classes
self.device_combo_box = DeviceComboBox(
parent=self, client=self.client, signal_class_filter=self.supported_signals
)
self.device_combo_box.setToolTip("Select Device")
self.device_combo_box.setEditable(True)
# Set expanding size policy so it grows with available space
self.device_combo_box.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed)
self.device_combo_box.lineEdit().setPlaceholderText("Select Device")
# Configure SignalComboBox to filter by PreviewSignal and supported async signals
# Also filter by ndim (1D and 2D only) for Image widget
self.signal_combo_box = SignalComboBox(
parent=self,
client=self.client,
signal_class_filter=[
"PreviewSignal",
"AsyncSignal",
"AsyncMultiSignal",
"DynamicSignal",
],
ndim_filter=[1, 2], # Only show 1D and 2D signals for Image widget
store_signal_config=True,
require_device=True,
)
self.signal_combo_box.setToolTip("Select Signal")
self.signal_combo_box.setEditable(True)
# Set expanding size policy so it grows with available space
self.signal_combo_box.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed)
self.signal_combo_box.lineEdit().setPlaceholderText("Select Signal")
# Connect comboboxes together
self.device_combo_box.currentTextChanged.connect(self.signal_combo_box.set_device)
self.device_combo_box.device_reset.connect(self.signal_combo_box.reset_selection)
# Simple horizontal layout with stretch to fill space
layout = QHBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(2)
layout.addWidget(self.device_combo_box, stretch=1)
layout.addWidget(self.signal_combo_box, stretch=1)
def set_device_and_signal(self, device_name: str | None, device_entry: str | None) -> None:
"""Set the displayed device and signal without emitting selection signals."""
device_name = device_name or ""
device_entry = device_entry or ""
self.device_combo_box.blockSignals(True)
self.signal_combo_box.blockSignals(True)
try:
if device_name:
# Set device in device_combo_box
index = self.device_combo_box.findText(device_name)
if index >= 0:
self.device_combo_box.setCurrentIndex(index)
else:
# Device not found in list, but still set it
self.device_combo_box.setCurrentText(device_name)
# Only update signal combobox device filter if it's actually changing
# This prevents redundant repopulation which can cause duplicates !!!!
current_device = getattr(self.signal_combo_box, "_device", None)
if current_device != device_name:
self.signal_combo_box.set_device(device_name)
# Sync signal combobox selection
if device_entry:
# Try to find the signal by component_name (which is what's displayed)
found = False
for i in range(self.signal_combo_box.count()):
text = self.signal_combo_box.itemText(i)
config_data = self.signal_combo_box.itemData(i)
# Check if this matches our signal
if config_data:
component_name = config_data.get("component_name", "")
if text == component_name or text == device_entry:
self.signal_combo_box.setCurrentIndex(i)
found = True
break
if not found:
# Fallback: try to match the device_entry directly
index = self.signal_combo_box.findText(device_entry)
if index >= 0:
self.signal_combo_box.setCurrentIndex(index)
else:
# No device set, clear selections
self.device_combo_box.setCurrentText("")
self.signal_combo_box.reset_selection()
finally:
# Always unblock signals
self.device_combo_box.blockSignals(False)
self.signal_combo_box.blockSignals(False)
def set_connection_status(self, status: str, message: str | None = None) -> None:
tooltip = f"Connection status: {status}"
if message:
tooltip = f"{tooltip}\n{message}"
self.device_combo_box.setToolTip(tooltip)
self.signal_combo_box.setToolTip(tooltip)
if not self.device_combo_box.is_valid_input or not self.signal_combo_box.is_valid_input:
return
if status == "error":
style = "border: 1px solid orange;"
else:
style = "border: 1px solid transparent;"
self.device_combo_box.setStyleSheet(style)
self.signal_combo_box.setStyleSheet(style)
def cleanup(self):
"""Clean up the widget resources."""
self.device_combo_box.close()
self.device_combo_box.deleteLater()
self.signal_combo_box.close()
self.signal_combo_box.deleteLater()
def device_selection_bundle(components: ToolbarComponents, client=None) -> ToolbarBundle:
"""
Creates a device selection toolbar bundle for Image widget.
Includes a resizable splitter after the device selection. All subsequent bundles'
actions will appear compactly after the splitter with no gaps.
Args:
components (ToolbarComponents): The components to be added to the bundle.
client: The BEC client instance.
Returns:
ToolbarBundle: The device selection toolbar bundle.
"""
device_selection_widget = DeviceSelection(parent=components.toolbar, client=client)
components.add_safe(
"device_selection", WidgetAction(widget=device_selection_widget, adjust_size=False)
)
bundle = ToolbarBundle("device_selection", components)
bundle.add_action("device_selection")
bundle.add_splitter(
name="device_selection_splitter",
target_widget=device_selection_widget,
min_width=210,
max_width=600,
)
return bundle
class DeviceSelectionConnection(BundleConnection):
"""
Connection helper for the device selection bundle.
"""
def __init__(self, components: ToolbarComponents, target_widget=None):
super().__init__(parent=components.toolbar)
self.bundle_name = "device_selection"
self.components = components
self.target_widget = target_widget
self._connected = False
self.register_property_sync("device_name", self._sync_from_device_name)
self.register_property_sync("device_entry", self._sync_from_device_entry)
self.register_property_sync("connection_status", self._sync_connection_status)
self.register_property_sync("connection_error", self._sync_connection_status)
def _widget(self) -> DeviceSelection:
return self.components.get_action("device_selection").widget
def connect(self):
if self._connected:
return
widget = self._widget()
widget.device_combo_box.device_selected.connect(
self.target_widget.on_device_selection_changed
)
widget.signal_combo_box.device_signal_changed.connect(
self.target_widget.on_device_selection_changed
)
self.connect_property_sync(self.target_widget)
self._connected = True
def disconnect(self):
if not self._connected:
return
widget = self._widget()
widget.device_combo_box.device_selected.disconnect(
self.target_widget.on_device_selection_changed
)
widget.signal_combo_box.device_signal_changed.disconnect(
self.target_widget.on_device_selection_changed
)
self.disconnect_property_sync(self.target_widget)
self._connected = False
widget.cleanup()
def _sync_from_device_name(self, _):
try:
widget = self._widget()
except Exception:
return
widget.set_device_and_signal(
self.target_widget.device_name, self.target_widget.device_entry
)
self.target_widget._sync_device_entry_from_toolbar()
def _sync_from_device_entry(self, _):
try:
widget = self._widget()
except Exception:
return
widget.set_device_and_signal(
self.target_widget.device_name, self.target_widget.device_entry
)
def _sync_connection_status(self, _):
try:
widget = self._widget()
except Exception:
return
widget.set_connection_status(
self.target_widget._config.connection_status,
self.target_widget._config.connection_error,
)
+4 -4
View File
@@ -32,7 +32,7 @@ dock_area = gui.new()
img_widget = dock_area.new().new(gui.available_widgets.Image)
# Add an ImageWidget to the BECFigure for a 2D detector
img_widget.image(monitor='eiger', monitor_type='2d')
img_widget.image(device_name='eiger', device_entry='preview')
img_widget.title = "Camera Image - Eiger Detector"
```
@@ -46,7 +46,7 @@ dock_area = gui.new()
img_widget = dock_area.new().new(gui.available_widgets.Image)
# Add an ImageWidget to the BECFigure for a 2D detector
img_widget.image(monitor='waveform', monitor_type='1d')
img_widget.image(device_name='waveform', device_entry='data')
img_widget.title = "Line Detector Data"
# Optional: Set the color map and value range
@@ -84,7 +84,7 @@ The Image Widget can be configured for different detectors by specifying the cor
```python
# For a 2D camera detector
img_widget = fig.image(monitor='eiger', monitor_type='2d')
img_widget = fig.image(device_name='eiger', device_entry='preview')
img_widget.set_title("Eiger Camera Image")
```
@@ -92,7 +92,7 @@ img_widget.set_title("Eiger Camera Image")
```python
# For a 1D line detector
img_widget = fig.image(monitor='waveform', monitor_type='1d')
img_widget = fig.image(device_name='waveform', device_entry='data')
img_widget.set_title("Line Detector Data")
```
+3 -3
View File
@@ -59,7 +59,7 @@ def test_rpc_add_dock_with_plots_e2e(qtbot, bec_client_lib, connected_client_gui
mm.map("samx", "samy")
curve = wf.plot(x_name="samx", y_name="bpm4i")
im_item = im.image("eiger")
im_item = im.image(device_name="eiger", device_entry="preview")
assert curve.__class__.__name__ == "RPCReference"
assert curve.__class__ == RPCReference
@@ -122,12 +122,12 @@ def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
assert gui.windows["bec"] is gui.bec
mw = gui.bec
assert mw.__class__.__name__ == "RPCReference"
assert gui._ipython_registry[mw._gui_id].__class__.__name__ == "AdvancedDockArea"
assert gui._ipython_registry[mw._gui_id].__class__.__name__ == "BECDockArea"
xw = gui.new("X")
xw.delete_all()
assert xw.__class__.__name__ == "RPCReference"
assert gui._ipython_registry[xw._gui_id].__class__.__name__ == "AdvancedDockArea"
assert gui._ipython_registry[xw._gui_id].__class__.__name__ == "BECDockArea"
assert len(gui.windows) == 2
assert gui._gui_is_alive()
@@ -42,7 +42,7 @@ def test_rpc_plotting_shortcuts_init_configs(qtbot, connected_client_gui_obj):
c3 = wf.plot(y=[1, 2, 3], x=[1, 2, 3])
assert c3.object_name == "Curve_0"
im.image(monitor="eiger")
im.image(device_name="eiger", device_entry="preview")
mm.map(x_name="samx", y_name="samy")
sw.plot(x_name="samx", y_name="samy", z_name="bpm4a")
mw.plot(monitor="waveform")
@@ -165,14 +165,14 @@ def test_rpc_image(qtbot, bec_client_lib, connected_client_gui_obj):
scans = client.scans
im = dock_area.new("Image")
im.image(monitor="eiger")
im.image(device_name="eiger", device_entry="preview")
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
status.wait()
last_image_device = client.connector.get_last(MessageEndpoints.device_monitor_2d("eiger"))[
"data"
].data
last_image_device = client.connector.get_last(
MessageEndpoints.device_preview("eiger", "preview")
)["data"].data
last_image_plot = im.main_image.get_data()
# check plotted data
+3 -2
View File
@@ -15,7 +15,7 @@ def test_rpc_reference_objects(connected_client_gui_obj):
plt.plot(x_name="samx", y_name="bpm4i")
im = dock_area.new("Image")
im.image("eiger")
im.image(device_name="eiger", device_entry="preview")
motor_map = dock_area.new("MotorMap")
motor_map.map("samx", "samy")
plt_z = dock_area.new("Waveform")
@@ -23,7 +23,8 @@ def test_rpc_reference_objects(connected_client_gui_obj):
assert len(plt_z.curves) == 1
assert len(plt.curves) == 1
assert im.monitor == "eiger"
assert im.device_name == "eiger"
assert im.device_entry == "preview"
assert isinstance(im.main_image, RPCReference)
image_item = gui._ipython_registry.get(im.main_image._gui_id, None)
@@ -16,6 +16,7 @@ from typing import TYPE_CHECKING
import numpy as np
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCReference
@@ -233,7 +234,7 @@ def test_widgets_e2e_image(qtbot, connected_client_gui_obj, random_generator_fro
scans = bec.scans
dev = bec.device_manager.devices
# Test rpc calls
img = widget.image(dev.eiger)
img = widget.image(device_name=dev.eiger.name, device_entry="preview")
assert img.get_data() is None
# Run a scan and plot the image
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
@@ -247,13 +248,13 @@ def test_widgets_e2e_image(qtbot, connected_client_gui_obj, random_generator_fro
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# Check that last image is equivalent to data in Redis
last_img = bec.device_monitor.get_data(
dev.eiger, count=1
) # Get last image from Redis monitor 2D endpoint
last_img = bec.connector.get_last(MessageEndpoints.device_preview("eiger", "preview"))[
"data"
].data
assert np.allclose(img.get_data(), last_img)
# Now add a device with a preview signal
img = widget.image(["eiger", "preview"])
img = widget.image(device_name="eiger", device_entry="preview")
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
s.wait()
-256
View File
@@ -1,256 +0,0 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
from math import inf
from unittest.mock import MagicMock, patch
import fakeredis
import pytest
from bec_lib.bec_service import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.redis_connector import RedisConnector
from bec_lib.scan_history import ScanHistory
from bec_widgets.tests.utils import DEVICES, DMMock, FakePositioner, Positioner
def fake_redis_server(host, port, **kwargs):
redis = fakeredis.FakeRedis()
return redis
@pytest.fixture(scope="function")
def mocked_client(bec_dispatcher):
connector = RedisConnector("localhost:1", redis_cls=fake_redis_server)
# Create a MagicMock object
client = MagicMock() # TODO change to real BECClient
# Shutdown the original client
bec_dispatcher.client.shutdown()
# Mock the connector attribute
bec_dispatcher.client = client
# Mock the device_manager.devices attribute
client.connector = connector
client.device_manager = DMMock()
client.device_manager.add_devices(DEVICES)
def mock_mv(*args, relative=False):
# Extracting motor and value pairs
for i in range(0, len(args), 2):
motor = args[i]
value = args[i + 1]
motor.move(value, relative=relative)
return MagicMock(wait=MagicMock())
client.scans = MagicMock(mv=mock_mv)
# Ensure isinstance check for Positioner passes
original_isinstance = isinstance
def isinstance_mock(obj, class_info):
if class_info == Positioner and isinstance(obj, FakePositioner):
return True
return original_isinstance(obj, class_info)
with patch("builtins.isinstance", new=isinstance_mock):
yield client
connector.shutdown() # TODO change to real BECClient
##################################################
# Client Fixture with DAP
##################################################
@pytest.fixture(scope="function")
def dap_plugin_message():
msg = messages.AvailableResourceMessage(
**{
"resource": {
"GaussianModel": {
"class": "LmfitService1D",
"user_friendly_name": "GaussianModel",
"class_doc": "A model based on a Gaussian or normal distribution lineshape.\n\n The model has three Parameters: `amplitude`, `center`, and `sigma`.\n In addition, parameters `fwhm` and `height` are included as\n constraints to report full width at half maximum and maximum peak\n height, respectively.\n\n .. math::\n\n f(x; A, \\mu, \\sigma) = \\frac{A}{\\sigma\\sqrt{2\\pi}} e^{[{-{(x-\\mu)^2}/{{2\\sigma}^2}}]}\n\n where the parameter `amplitude` corresponds to :math:`A`, `center` to\n :math:`\\mu`, and `sigma` to :math:`\\sigma`. The full width at half\n maximum is :math:`2\\sigma\\sqrt{2\\ln{2}}`, approximately\n :math:`2.3548\\sigma`.\n\n For more information, see: https://en.wikipedia.org/wiki/Normal_distribution\n\n ",
"run_doc": "A model based on a Gaussian or normal distribution lineshape.\n\n The model has three Parameters: `amplitude`, `center`, and `sigma`.\n In addition, parameters `fwhm` and `height` are included as\n constraints to report full width at half maximum and maximum peak\n height, respectively.\n\n .. math::\n\n f(x; A, \\mu, \\sigma) = \\frac{A}{\\sigma\\sqrt{2\\pi}} e^{[{-{(x-\\mu)^2}/{{2\\sigma}^2}}]}\n\n where the parameter `amplitude` corresponds to :math:`A`, `center` to\n :math:`\\mu`, and `sigma` to :math:`\\sigma`. The full width at half\n maximum is :math:`2\\sigma\\sqrt{2\\ln{2}}`, approximately\n :math:`2.3548\\sigma`.\n\n For more information, see: https://en.wikipedia.org/wiki/Normal_distribution\n\n \n Args:\n scan_item (ScanItem): Scan item or scan ID\n device_x (DeviceBase | str): Device name for x\n signal_x (DeviceBase | str): Signal name for x\n device_y (DeviceBase | str): Device name for y\n signal_y (DeviceBase | str): Signal name for y\n parameters (dict): Fit parameters\n ",
"run_name": "fit",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "scan_item",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "ScanItem | str",
},
{
"name": "device_x",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceBase | str",
},
{
"name": "signal_x",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceBase | str",
},
{
"name": "device_y",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceBase | str",
},
{
"name": "signal_y",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceBase | str",
},
{
"name": "parameters",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
"auto_fit_supported": True,
"params": {
"amplitude": {
"name": "amplitude",
"value": 1.0,
"vary": True,
"min": -inf,
"max": inf,
"expr": None,
"brute_step": None,
"user_data": None,
},
"center": {
"name": "center",
"value": 0.0,
"vary": True,
"min": -inf,
"max": inf,
"expr": None,
"brute_step": None,
"user_data": None,
},
"sigma": {
"name": "sigma",
"value": 1.0,
"vary": True,
"min": 0,
"max": inf,
"expr": None,
"brute_step": None,
"user_data": None,
},
"fwhm": {
"name": "fwhm",
"value": 2.35482,
"vary": False,
"min": -inf,
"max": inf,
"expr": "2.3548200*sigma",
"brute_step": None,
"user_data": None,
},
"height": {
"name": "height",
"value": 0.3989423,
"vary": False,
"min": -inf,
"max": inf,
"expr": "0.3989423*amplitude/max(1e-15, sigma)",
"brute_step": None,
"user_data": None,
},
},
"class_args": [],
"class_kwargs": {"model": "GaussianModel"},
}
}
}
)
yield msg
@pytest.fixture(scope="function")
def mocked_client_with_dap(mocked_client, dap_plugin_message):
dap_services = {
"BECClient": messages.StatusMessage(name="BECClient", status=1, info={}),
"DAPServer/LmfitService1D": messages.StatusMessage(
name="LmfitService1D", status=1, info={}
),
}
client = mocked_client
client.service_status = dap_services
client.connector.set(
topic=MessageEndpoints.dap_available_plugins("dap"), msg=dap_plugin_message
)
# Patch the client's DAP attribute so that the available models include "GaussianModel"
patched_models = {"GaussianModel": {}, "LorentzModel": {}, "SineModel": {}}
client.dap._available_dap_plugins = patched_models
yield client
class DummyData:
def __init__(self, val, timestamps):
self.val = val
self.timestamps = timestamps
def get(self, key, default=None):
if key == "val":
return self.val
return default
def create_dummy_scan_item():
"""
Helper to create a dummy scan item with both live_data and metadata/status_message info.
"""
dummy_live_data = {
"samx": {"samx": DummyData(val=[10, 20, 30], timestamps=[100, 200, 300])},
"samy": {"samy": DummyData(val=[5, 10, 15], timestamps=[100, 200, 300])},
"bpm4i": {"bpm4i": DummyData(val=[5, 6, 7], timestamps=[101, 201, 301])},
"async_device": {"async_device": DummyData(val=[1, 2, 3], timestamps=[11, 21, 31])},
}
dummy_scan = MagicMock()
dummy_scan.live_data = dummy_live_data
dummy_scan.metadata = {
"bec": {
"scan_id": "dummy",
"scan_report_devices": ["samx"],
"readout_priority": {"monitored": ["bpm4i"], "async": ["async_device"]},
}
}
dummy_scan.status_message = MagicMock()
dummy_scan.status_message.info = {
"readout_priority": {"monitored": ["bpm4i"], "async": ["async_device"]},
"scan_report_devices": ["samx"],
}
return dummy_scan
def inject_scan_history(widget, scan_history_factory, *history_args):
"""
Helper to inject scan history messages into client history.
"""
history_msgs = []
for scan_id, scan_number in history_args:
history_msgs.append(scan_history_factory(scan_id=scan_id, scan_number=scan_number))
widget.client.history = ScanHistory(widget.client, False)
for msg in history_msgs:
widget.client.history._scan_data[msg.scan_id] = msg
widget.client.history._scan_ids.append(msg.scan_id)
widget.client.queue.scan_storage.current_scan = None
return history_msgs
+216 -105
View File
@@ -1,19 +1,37 @@
import json
import time
from math import inf
from unittest import mock
from unittest.mock import MagicMock, PropertyMock, patch
import fakeredis
import h5py
import numpy as np
import pytest
from bec_lib import messages
from bec_lib import messages, service_config
from bec_lib.bec_service import messages
from bec_lib.client import BECClient
from bec_lib.endpoints import MessageEndpoints
from bec_lib.messages import _StoredDataInfo
from bec_lib.scan_history import ScanHistory
from bec_qthemes import apply_theme
from ophyd._pyepics_shim import _dispatcher
from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
from qtpy.QtCore import QEvent, QEventLoop
from qtpy.QtWidgets import QApplication, QMessageBox
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.tests.utils import (
DEVICES,
DMMock,
FakePositioner,
Positioner,
create_history_file,
process_all_deferred_deletes,
)
from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module
from bec_widgets.utils import error_popups
from bec_widgets.utils.bec_dispatcher import QtRedisConnector
# Patch to set default RAISE_ERROR_DEFAULT to True for tests
# This means that by default, error popups will raise exceptions during tests
@@ -29,11 +47,6 @@ def pytest_runtest_makereport(item, call):
item.stash["failed"] = rep.failed
def process_all_deferred_deletes(qapp):
qapp.sendPostedEvents(None, QEvent.DeferredDelete)
qapp.processEvents(QEventLoop.AllEvents)
@pytest.fixture(autouse=True)
def qapplication(qtbot, request, testable_qtimer_class): # pylint: disable=unused-argument
qapp = QApplication.instance()
@@ -46,7 +59,6 @@ def qapplication(qtbot, request, testable_qtimer_class): # pylint: disable=unus
# if the test failed, we don't want to check for open widgets as
# it simply pollutes the output
# stop pyepics dispatcher for leaking tests
from ophyd._pyepics_shim import _dispatcher
_dispatcher.stop()
if request.node.stash._storage.get("failed"):
@@ -71,9 +83,36 @@ def rpc_register():
RPCRegister.reset_singleton()
_REDIS_CONN: QtRedisConnector | None = None
def global_mock_qt_redis_connector(*_, **__):
global _REDIS_CONN
if _REDIS_CONN is None:
_REDIS_CONN = QtRedisConnector(bootstrap="localhost:1", redis_cls=fakeredis.FakeRedis)
return _REDIS_CONN
def mock_client(*_, **__):
with (
patch("bec_lib.client.DeviceManagerBase", DMMock),
patch("bec_lib.client.DAPPlugins"),
patch("bec_lib.client.Scans"),
patch("bec_lib.client.ScanManager"),
patch("bec_lib.bec_service.BECAccess"),
):
client = BECClient(
config=service_config.ServiceConfig(config={"redis": {"host": "localhost", "port": 1}}),
connector_cls=global_mock_qt_redis_connector,
)
client.start()
return client
@pytest.fixture(autouse=True)
def bec_dispatcher(threads_check): # pylint: disable=unused-argument
bec_dispatcher = bec_dispatcher_module.BECDispatcher()
with mock.patch.object(bec_dispatcher_module, "BECClient", mock_client):
bec_dispatcher = bec_dispatcher_module.BECDispatcher()
yield bec_dispatcher
bec_dispatcher.disconnect_all()
# clean BEC client
@@ -97,103 +136,6 @@ def suppress_message_box(monkeypatch):
monkeypatch.setattr(QMessageBox, "exec_", lambda *args, **kwargs: QMessageBox.Ok)
def create_widget(qtbot, widget, *args, **kwargs):
"""
Create a widget and add it to the qtbot for testing. This is a helper function that
should be used in all tests that require a widget to be created.
Args:
qtbot (fixture): pytest-qt fixture
widget (QWidget): widget class to be created
*args: positional arguments for the widget
**kwargs: keyword arguments for the widget
Returns:
QWidget: the created widget
"""
widget = widget(*args, **kwargs)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
return widget
def create_history_file(file_path, data: dict, metadata: dict) -> messages.ScanHistoryMessage:
"""
Helper to create a history file with the given data.
The data should contain readout groups, e.g.
{
"baseline": {"samx": {"samx": {"value": [1, 2, 3], "timestamp": [100, 200, 300]}},
"monitored": {"bpm4i": {"bpm4i": {"value": [5, 6, 7], "timestamp": [101, 201, 301]}}},
"async": {"async_device": {"async_device": {"value": [1, 2, 3], "timestamp": [11, 21, 31]}}},
}
"""
with h5py.File(file_path, "w") as f:
_metadata = f.create_group("entry/collection/metadata")
_metadata.create_dataset("sample_name", data="test_sample")
metadata_bec = f.create_group("entry/collection/metadata/bec")
for key, value in metadata.items():
if isinstance(value, dict):
metadata_bec.create_group(key)
for sub_key, sub_value in value.items():
if isinstance(sub_value, list):
sub_value = json.dumps(sub_value)
metadata_bec[key].create_dataset(sub_key, data=sub_value)
elif isinstance(sub_value, dict):
for sub_sub_key, sub_sub_value in sub_value.items():
sub_sub_group = metadata_bec[key].create_group(sub_key)
# Handle _StoredDataInfo objects
if isinstance(sub_sub_value, _StoredDataInfo):
# Store the numeric shape
sub_sub_group.create_dataset("shape", data=sub_sub_value.shape)
# Store the dtype as a UTF-8 string
dt = sub_sub_value.dtype or ""
sub_sub_group.create_dataset(
"dtype", data=dt, dtype=h5py.string_dtype(encoding="utf-8")
)
continue
if isinstance(sub_sub_value, list):
json_val = json.dumps(sub_sub_value)
sub_sub_group.create_dataset(sub_sub_key, data=json_val)
elif isinstance(sub_sub_value, dict):
for k2, v2 in sub_sub_value.items():
val = json.dumps(v2) if isinstance(v2, list) else v2
sub_sub_group.create_dataset(k2, data=val)
else:
sub_sub_group.create_dataset(sub_sub_key, data=sub_sub_value)
else:
metadata_bec[key].create_dataset(sub_key, data=sub_value)
else:
metadata_bec.create_dataset(key, data=value)
for group, devices in data.items():
readout_group = f.create_group(f"entry/collection/readout_groups/{group}")
for device, device_data in devices.items():
dev_group = f.create_group(f"entry/collection/devices/{device}")
for signal, signal_data in device_data.items():
signal_group = dev_group.create_group(signal)
for signal_key, signal_values in signal_data.items():
signal_group.create_dataset(signal_key, data=signal_values)
readout_group[device] = h5py.SoftLink(f"/entry/collection/devices/{device}")
msg = messages.ScanHistoryMessage(
scan_id=metadata["scan_id"],
scan_name=metadata["scan_name"],
exit_status=metadata["exit_status"],
file_path=file_path,
scan_number=metadata["scan_number"],
dataset_number=metadata["dataset_number"],
start_time=time.time(),
end_time=time.time(),
num_points=metadata["num_points"],
request_inputs=metadata["request_inputs"],
stored_data_info=metadata.get("stored_data_info"),
metadata={"scan_report_devices": metadata.get("scan_report_devices")},
)
return msg
@pytest.fixture
def grid_scan_history_msg(tmpdir):
x_grid, y_grid = np.meshgrid(np.linspace(-5, 5, 10), np.linspace(-5, 5, 10))
@@ -339,3 +281,172 @@ def scan_history_factory(tmpdir):
return create_history_file(file_path, data, metadata)
return _factory
@pytest.fixture(scope="function")
def mocked_client(bec_dispatcher):
# Ensure isinstance check for Positioner passes
original_isinstance = isinstance
def isinstance_mock(obj, class_info):
if class_info == Positioner and isinstance(obj, FakePositioner):
return True
return original_isinstance(obj, class_info)
with patch("builtins.isinstance", new=isinstance_mock):
yield bec_dispatcher.client
bec_dispatcher.client.connector.shutdown()
@pytest.fixture(scope="function")
def mock_client_w_devices(mocked_client):
mocked_client.device_manager.add_devices(DEVICES)
yield mocked_client
##################################################
# Client Fixture with DAP
##################################################
@pytest.fixture(scope="function")
def dap_plugin_message():
msg = messages.AvailableResourceMessage(
**{
"resource": {
"GaussianModel": {
"class": "LmfitService1D",
"user_friendly_name": "GaussianModel",
"class_doc": "A model based on a Gaussian or normal distribution lineshape.\n\n The model has three Parameters: `amplitude`, `center`, and `sigma`.\n In addition, parameters `fwhm` and `height` are included as\n constraints to report full width at half maximum and maximum peak\n height, respectively.\n\n .. math::\n\n f(x; A, \\mu, \\sigma) = \\frac{A}{\\sigma\\sqrt{2\\pi}} e^{[{-{(x-\\mu)^2}/{{2\\sigma}^2}}]}\n\n where the parameter `amplitude` corresponds to :math:`A`, `center` to\n :math:`\\mu`, and `sigma` to :math:`\\sigma`. The full width at half\n maximum is :math:`2\\sigma\\sqrt{2\\ln{2}}`, approximately\n :math:`2.3548\\sigma`.\n\n For more information, see: https://en.wikipedia.org/wiki/Normal_distribution\n\n ",
"run_doc": "A model based on a Gaussian or normal distribution lineshape.\n\n The model has three Parameters: `amplitude`, `center`, and `sigma`.\n In addition, parameters `fwhm` and `height` are included as\n constraints to report full width at half maximum and maximum peak\n height, respectively.\n\n .. math::\n\n f(x; A, \\mu, \\sigma) = \\frac{A}{\\sigma\\sqrt{2\\pi}} e^{[{-{(x-\\mu)^2}/{{2\\sigma}^2}}]}\n\n where the parameter `amplitude` corresponds to :math:`A`, `center` to\n :math:`\\mu`, and `sigma` to :math:`\\sigma`. The full width at half\n maximum is :math:`2\\sigma\\sqrt{2\\ln{2}}`, approximately\n :math:`2.3548\\sigma`.\n\n For more information, see: https://en.wikipedia.org/wiki/Normal_distribution\n\n \n Args:\n scan_item (ScanItem): Scan item or scan ID\n device_x (DeviceBase | str): Device name for x\n signal_x (DeviceBase | str): Signal name for x\n device_y (DeviceBase | str): Device name for y\n signal_y (DeviceBase | str): Signal name for y\n parameters (dict): Fit parameters\n ",
"run_name": "fit",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "scan_item",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "ScanItem | str",
},
{
"name": "device_x",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceBase | str",
},
{
"name": "signal_x",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceBase | str",
},
{
"name": "device_y",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceBase | str",
},
{
"name": "signal_y",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceBase | str",
},
{
"name": "parameters",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
"auto_fit_supported": True,
"params": {
"amplitude": {
"name": "amplitude",
"value": 1.0,
"vary": True,
"min": -inf,
"max": inf,
"expr": None,
"brute_step": None,
"user_data": None,
},
"center": {
"name": "center",
"value": 0.0,
"vary": True,
"min": -inf,
"max": inf,
"expr": None,
"brute_step": None,
"user_data": None,
},
"sigma": {
"name": "sigma",
"value": 1.0,
"vary": True,
"min": 0,
"max": inf,
"expr": None,
"brute_step": None,
"user_data": None,
},
"fwhm": {
"name": "fwhm",
"value": 2.35482,
"vary": False,
"min": -inf,
"max": inf,
"expr": "2.3548200*sigma",
"brute_step": None,
"user_data": None,
},
"height": {
"name": "height",
"value": 0.3989423,
"vary": False,
"min": -inf,
"max": inf,
"expr": "0.3989423*amplitude/max(1e-15, sigma)",
"brute_step": None,
"user_data": None,
},
},
"class_args": [],
"class_kwargs": {"model": "GaussianModel"},
}
}
}
)
yield msg
@pytest.fixture(scope="function")
def mocked_client_with_dap(mocked_client, dap_plugin_message):
mocked_client.device_manager.add_devices(DEVICES)
dap_services = {
"BECClient": messages.StatusMessage(name="BECClient", status=1, info={}),
"DAPServer/LmfitService1D": messages.StatusMessage(
name="LmfitService1D", status=1, info={}
),
}
type(mocked_client).service_status = PropertyMock(return_value=dap_services)
mocked_client.connector.set(
topic=MessageEndpoints.dap_available_plugins("dap"), msg=dap_plugin_message
)
# Patch the client's DAP attribute so that the available models include "GaussianModel"
patched_models = {"GaussianModel": {}, "LorentzModel": {}, "SineModel": {}}
mocked_client.dap._available_dap_plugins = patched_models
yield mocked_client
+3 -2
View File
@@ -1,15 +1,16 @@
# pylint: disable=missing-function-docstring, missing-module-docstring, unused-import
from unittest.mock import MagicMock
import pytest
from bec_widgets.widgets.control.buttons.button_abort.button_abort import AbortButton
from .client_mocks import mocked_client
@pytest.fixture
def abort_button(qtbot, mocked_client):
widget = AbortButton(client=mocked_client)
widget.queue = MagicMock()
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
+1 -2
View File
@@ -1,10 +1,9 @@
import pytest
from qtpy.QtWidgets import QDoubleSpinBox, QLineEdit
from bec_widgets.tests.utils import create_widget
from bec_widgets.widgets.plots.plot_base import PlotBase
from bec_widgets.widgets.plots.setting_menus.axis_settings import AxisSettings
from tests.unit_tests.client_mocks import mocked_client
from tests.unit_tests.conftest import create_widget
@pytest.fixture
+1 -3
View File
@@ -9,8 +9,6 @@ from bec_widgets.utils import BECConnector
from bec_widgets.utils.error_popups import SafeProperty
from bec_widgets.utils.error_popups import SafeSlot as Slot
from .client_mocks import mocked_client
class BECConnectorQObject(BECConnector, QObject): ...
@@ -134,7 +132,7 @@ def test_bec_connector_change_object_name(bec_connector):
assert not any(obj.objectName() == previous_name for obj in all_objects)
def test_bec_connector_export_settings():
def test_bec_connector_export_settings(mocked_client):
class MyWidget(BECConnector, QWidget):
def __init__(self, parent=None, client=None, **kwargs):
+36 -1
View File
@@ -4,10 +4,45 @@ import time
from unittest import mock
import pytest
from bec_lib import service_config
from bec_lib.messages import ScanMessage
from bec_lib.serialization import MsgpackSerialization
from bec_widgets.utils.bec_dispatcher import QtRedisConnector, QtThreadSafeCallback
from bec_widgets.utils.bec_dispatcher import BECDispatcher, QtRedisConnector, QtThreadSafeCallback
def test_init_handles_client_and_config_arg():
# Client passed
self_mock = mock.MagicMock(_initialized=False)
with mock.patch.object(BECDispatcher, "start_cli_server"):
BECDispatcher.__init__(self_mock, client=mock.MagicMock(name="test_client"))
assert "test_client" in repr(self_mock.client)
# No client, service config object
self_mock.reset_mock()
self_mock._initialized = False
with (
mock.patch.object(BECDispatcher, "start_cli_server"),
mock.patch("bec_widgets.utils.bec_dispatcher.BECClient") as client_cls,
):
config = service_config.ServiceConfig()
BECDispatcher.__init__(self_mock, client=None, config=config)
client_cls.assert_called_with(
config=config, connector_cls=QtRedisConnector, name="BECWidgets"
)
# No client, service config string
self_mock.reset_mock()
self_mock._initialized = False
with (
mock.patch.object(BECDispatcher, "start_cli_server"),
mock.patch("bec_widgets.utils.bec_dispatcher.BECClient"),
mock.patch("bec_widgets.utils.bec_dispatcher.ServiceConfig") as svc_cfg,
mock.patch("bec_widgets.utils.bec_dispatcher.isinstance", return_value=False),
):
config = service_config.ServiceConfig()
BECDispatcher.__init__(self_mock, client=None, config="test_str")
svc_cfg.assert_called_with("test_str")
@pytest.fixture
-2
View File
@@ -3,8 +3,6 @@ from bec_lib import messages
from bec_widgets.widgets.services.bec_queue.bec_queue import BECQueue
from .client_mocks import mocked_client
@pytest.fixture
def bec_queue_msg_full():
-2
View File
@@ -9,8 +9,6 @@ from bec_widgets.widgets.services.bec_status_box.bec_status_box import (
BECStatusBox,
)
from .client_mocks import mocked_client
@pytest.fixture
def service_status_fixture():
+1 -3
View File
@@ -5,8 +5,6 @@ from qtpy.QtWidgets import QLabel, QVBoxLayout, QWidget
from bec_widgets import BECWidget
from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget
from .client_mocks import mocked_client
class _TestBusyWidget(BECWidget, QWidget):
def __init__(
@@ -29,7 +27,7 @@ def widget_busy(qtbot, mocked_client):
@pytest.fixture
def widget_idle(qtbot):
def widget_idle(qtbot, mocked_client):
w = _TestBusyWidget(client=mocked_client, start_busy=False)
qtbot.addWidget(w)
w.resize(320, 200)
+2 -2
View File
@@ -3,13 +3,13 @@ from unittest import mock
import pytest
from bec_widgets.cli.client import AdvancedDockArea
from bec_widgets.cli.client import BECDockArea
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
@pytest.fixture
def cli_dock_area():
dock_area = AdvancedDockArea(gui_id="test")
dock_area = BECDockArea(gui_id="test")
with mock.patch.object(dock_area, "_run_rpc") as mock_rpc_call:
with mock.patch.object(dock_area, "_gui_is_alive", return_value=True):
yield dock_area, mock_rpc_call
+1 -2
View File
@@ -2,12 +2,11 @@ from qtpy.QtCore import Qt
from qtpy.QtGui import QColor
from qtpy.QtWidgets import QColorDialog
from bec_widgets.tests.utils import create_widget
from bec_widgets.widgets.utility.visual.color_button_native.color_button_native import (
ColorButtonNative,
)
from .conftest import create_widget
def test_color_button_native(qtbot):
cb = create_widget(qtbot, ColorButtonNative)
+40 -2
View File
@@ -4,12 +4,11 @@ from pydantic import ValidationError
from qtpy.QtGui import QColor
from qtpy.QtWidgets import QVBoxLayout, QWidget
from bec_widgets.tests.utils import create_widget
from bec_widgets.utils import Colors, ConnectionConfig
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import apply_theme
from bec_widgets.widgets.plots.waveform.curve import CurveConfig
from tests.unit_tests.client_mocks import mocked_client
from tests.unit_tests.conftest import create_widget
def test_color_validation_CSS():
@@ -82,6 +81,45 @@ def test_rgba_to_hex():
assert Colors.rgba_to_hex(255, 87, 51) == "#FF5733FF"
def test_canonical_colormap_name_case_insensitive():
available = Colors.list_available_colormaps()
presets = Colors.list_available_gradient_presets()
if not available and not presets:
pytest.skip("No colormaps or presets available to test canonical mapping.")
name = (available or presets)[0]
requested = name.swapcase()
assert Colors.canonical_colormap_name(requested) == name
def test_validate_color_map_returns_canonical_name():
available = Colors.list_available_colormaps()
presets = Colors.list_available_gradient_presets()
if not available and not presets:
pytest.skip("No colormaps or presets available to test validation.")
name = (available or presets)[0]
requested = name.swapcase()
assert Colors.validate_color_map(requested) == name
def test_get_colormap_uses_gradient_preset_fallback(monkeypatch):
presets = Colors.list_available_gradient_presets()
if not presets:
pytest.skip("No gradient presets available to test fallback.")
preset = presets[0]
Colors._get_colormap_cached.cache_clear()
def _raise(*args, **kwargs):
raise Exception("registry unavailable")
monkeypatch.setattr(pg.colormap, "get", _raise)
cmap = Colors._get_colormap_cached(preset)
assert isinstance(cmap, pg.ColorMap)
@pytest.mark.parametrize("num", [10, 100, 400])
def test_evenly_spaced_colors(num):
colors_qcolor = Colors.evenly_spaced_colors(colormap="magma", num=num, format="QColor")
+1 -3
View File
@@ -4,12 +4,10 @@ import pytest
from qtpy.QtCore import QPointF, Qt
from qtpy.QtGui import QTransform
from bec_widgets.tests.utils import create_widget
from bec_widgets.utils import Crosshair
from bec_widgets.widgets.plots.image.image_item import ImageItem
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from tests.unit_tests.client_mocks import mocked_client
from .conftest import create_widget
# pylint: disable = redefined-outer-name
+3 -4
View File
@@ -6,14 +6,13 @@ from bec_lib.scan_history import ScanHistory
from qtpy.QtGui import QValidator
from qtpy.QtWidgets import QComboBox, QVBoxLayout
from bec_widgets.tests.utils import create_widget
from bec_widgets.widgets.plots.waveform.settings.curve_settings.curve_setting import CurveSetting
from bec_widgets.widgets.plots.waveform.settings.curve_settings.curve_tree import (
CurveTree,
ScanIndexValidator,
)
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from tests.unit_tests.client_mocks import dap_plugin_message, mocked_client, mocked_client_with_dap
from tests.unit_tests.conftest import create_widget
##################################################
# CurveSetting
@@ -21,11 +20,11 @@ from tests.unit_tests.conftest import create_widget
@pytest.fixture
def curve_setting_fixture(qtbot, mocked_client):
def curve_setting_fixture(qtbot, mock_client_w_devices):
"""
Creates a CurveSetting widget targeting a mock or real Waveform widget.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
wf = create_widget(qtbot, Waveform, client=mock_client_w_devices)
wf.x_mode = "auto"
curve_setting = create_widget(qtbot, CurveSetting, parent=None, target_widget=wf)
return curve_setting, wf
+1 -3
View File
@@ -1,10 +1,8 @@
import pytest
from bec_widgets.tests.utils import create_widget
from bec_widgets.widgets.dap.dap_combo_box.dap_combo_box import DapComboBox
from .client_mocks import mocked_client
from .conftest import create_widget
@pytest.fixture(scope="function")
def dap_combobox(qtbot, mocked_client):
+1 -1
View File
@@ -7,7 +7,7 @@ from bec_widgets.utils.colors import apply_theme
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
# pylint: disable=unused-import
from .client_mocks import mocked_client
# pylint: disable=redefined-outer-name
-2
View File
@@ -22,8 +22,6 @@ from bec_widgets.widgets.editors.monaco.monaco_dock import MonacoDock
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget
from bec_widgets.widgets.utility.ide_explorer.ide_explorer import IDEExplorer
from .client_mocks import mocked_client
@pytest.fixture
def developer_view(qtbot, mocked_client):
+9 -11
View File
@@ -14,8 +14,6 @@ from bec_widgets.widgets.services.device_browser.device_item.device_signal_displ
SignalDisplay,
)
from .client_mocks import mocked_client
if TYPE_CHECKING: # pragma: no cover
from qtpy.QtWidgets import QListWidgetItem
@@ -29,8 +27,8 @@ if TYPE_CHECKING: # pragma: no cover
@pytest.fixture
def device_browser(qtbot, mocked_client):
dev_browser = DeviceBrowser(client=mocked_client)
def device_browser(qtbot, mock_client_w_devices):
dev_browser = DeviceBrowser(client=mock_client_w_devices)
dev_browser.dev["samx"].read_configuration = mock.MagicMock()
qtbot.addWidget(dev_browser)
qtbot.waitExposed(dev_browser)
@@ -148,8 +146,8 @@ def test_device_deletion(device_browser, qtbot):
qtbot.waitUntil(lambda: widget.device not in device_browser.dev_list._item_dict, timeout=10000)
def test_signal_display(mocked_client, qtbot):
signal_display = SignalDisplay(client=mocked_client, device="test_device")
def test_signal_display(mock_client_w_devices, qtbot):
signal_display = SignalDisplay(client=mock_client_w_devices, device="test_device")
qtbot.addWidget(signal_display)
device_mock = mock.MagicMock()
signal_display.dev = {"test_device": device_mock}
@@ -158,10 +156,10 @@ def test_signal_display(mocked_client, qtbot):
device_mock.read_configuration.assert_called()
def test_signal_display_no_device(mocked_client, qtbot):
def test_signal_display_no_device(mock_client_w_devices, qtbot):
device_mock = mock.MagicMock()
mocked_client.client.device_manager.devices = {"test_device_1": device_mock}
signal_display = SignalDisplay(client=mocked_client, device="test_device_2")
mock_client_w_devices.device_manager.devices = {"test_device_1": device_mock}
signal_display = SignalDisplay(client=mock_client_w_devices, device="test_device_2")
qtbot.addWidget(signal_display)
assert (
signal_display._content_layout.itemAt(1).widget().text()
@@ -172,11 +170,11 @@ def test_signal_display_no_device(mocked_client, qtbot):
device_mock.read_configuration.assert_not_called()
def test_signal_display_omitted_not_added(mocked_client, qtbot):
def test_signal_display_omitted_not_added(mock_client_w_devices, qtbot):
device_mock = mock.MagicMock(spec=Device)
device_mock._info = {"signals": {"signal_1": {"kind_str": "omitted"}}}
signal_display = SignalDisplay(client=mocked_client, device="test_device_1")
signal_display = SignalDisplay(client=mock_client_w_devices, device="test_device_1")
signal_display.dev = {"test_device_1": device_mock}
signal_display._populate()
@@ -6,8 +6,6 @@ from bec_widgets.widgets.progress.device_initialization_progress_bar.device_init
DeviceInitializationProgressBar,
)
from .client_mocks import mocked_client
@pytest.fixture
def progress_bar(qtbot, mocked_client):
+1 -3
View File
@@ -4,6 +4,7 @@ import pytest
from bec_lib.device import ReadoutPriority
from qtpy.QtWidgets import QWidget
from bec_widgets.tests.utils import create_widget
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import (
BECDeviceFilter,
DeviceInputBase,
@@ -11,9 +12,6 @@ from bec_widgets.widgets.control.device_input.base_classes.device_input_base imp
)
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from .client_mocks import mocked_client
from .conftest import create_widget
# DeviceInputBase is meant to be mixed in a QWidget
class DeviceInputWidget(DeviceInputBase, QWidget):
+8 -10
View File
@@ -7,21 +7,19 @@ from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit
DeviceLineEdit,
)
from .client_mocks import mocked_client
@pytest.fixture
def device_input_combobox(qtbot, mocked_client):
widget = DeviceComboBox(client=mocked_client)
def device_input_combobox(qtbot, mock_client_w_devices):
widget = DeviceComboBox(client=mock_client_w_devices)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
@pytest.fixture
def device_input_combobox_with_kwargs(qtbot, mocked_client):
def device_input_combobox_with_kwargs(qtbot, mock_client_w_devices):
widget = DeviceComboBox(
client=mocked_client,
client=mock_client_w_devices,
gui_id="test_gui_id",
device_filter=[BECDeviceFilter.POSITIONER],
default="samx",
@@ -74,17 +72,17 @@ def test_get_device_from_input_combobox_init(device_input_combobox):
@pytest.fixture
def device_input_line_edit(qtbot, mocked_client):
widget = DeviceLineEdit(client=mocked_client)
def device_input_line_edit(qtbot, mock_client_w_devices):
widget = DeviceLineEdit(client=mock_client_w_devices)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
@pytest.fixture
def device_input_line_edit_with_kwargs(qtbot, mocked_client):
def device_input_line_edit_with_kwargs(qtbot, mock_client_w_devices):
widget = DeviceLineEdit(
client=mocked_client,
client=mock_client_w_devices,
gui_id="test_gui_id",
device_filter=[BECDeviceFilter.POSITIONER],
default="samx",
@@ -56,8 +56,7 @@ from bec_widgets.widgets.control.device_manager.components.ophyd_validation.vali
ValidationListItem,
)
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
from .client_mocks import mocked_client
from tests.unit_tests.conftest import mocked_client
class TestConstants:
@@ -364,7 +363,7 @@ class TestDeviceTable:
assert hasattr(device_table, "client_callback_id")
def test_device_table_client_device_update_callback(
self, device_table: DeviceTable, mocked_client, qtbot
self, device_table: DeviceTable, mock_client_w_devices, qtbot
):
"""
Test that runs the client device update callback. This should update the status of devices in the table
@@ -375,6 +374,7 @@ class TestDeviceTable:
device from the client and run the callback. The table should update the status of the
removed device to CAN_CONNECT and all others to CONNECTED.
"""
mocked_client = mock_client_w_devices
device_configs_changed_calls = []
requested_update_for_multiple_device_validations = []
@@ -43,8 +43,6 @@ from bec_widgets.widgets.control.device_manager.components.ophyd_validation.ophy
OphydValidation,
)
from .client_mocks import mocked_client
@pytest.fixture
def device_config() -> dict:
+6 -8
View File
@@ -4,6 +4,7 @@ import pytest
from bec_lib.device import Signal
from qtpy.QtWidgets import QWidget
from bec_widgets.tests.utils import create_widget
from bec_widgets.utils.ophyd_kind_util import Kind
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.base_classes.device_signal_input_base import (
@@ -15,9 +16,6 @@ from bec_widgets.widgets.control.device_input.signal_line_edit.signal_line_edit
SignalLineEdit,
)
from .client_mocks import mocked_client
from .conftest import create_widget
class FakeSignal(Signal):
"""Fake signal to test the DeviceSignalInputBase."""
@@ -146,12 +144,12 @@ def test_signal_lineedit(device_signal_line_edit):
def test_device_signal_input_base_cleanup(qtbot, mocked_client):
with mock.patch.object(mocked_client.callbacks, "remove"):
widget = DeviceInputWidget(client=mocked_client)
widget.close()
widget.deleteLater()
widget = DeviceInputWidget(client=mocked_client)
widget.close()
widget.deleteLater()
mocked_client.callbacks.remove.assert_called_once_with(widget._device_update_register)
mocked_client.callbacks.remove.assert_called_once_with(widget._device_update_register)
def test_signal_combobox_get_signal_name_with_item_data(qtbot, device_signal_combobox):
@@ -10,17 +10,14 @@ from qtpy.QtCore import QSettings, Qt, QTimer
from qtpy.QtGui import QPixmap
from qtpy.QtWidgets import QDialog, QMessageBox, QWidget
import bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area as basic_dock_module
import bec_widgets.widgets.containers.advanced_dock_area.profile_utils as profile_utils
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import (
AdvancedDockArea,
SaveProfileDialog,
)
from bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area import (
import bec_widgets.widgets.containers.dock_area.basic_dock_area as basic_dock_module
import bec_widgets.widgets.containers.dock_area.profile_utils as profile_utils
from bec_widgets.widgets.containers.dock_area.basic_dock_area import (
DockAreaWidget,
DockSettingsDialog,
)
from bec_widgets.widgets.containers.advanced_dock_area.profile_utils import (
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea, SaveProfileDialog
from bec_widgets.widgets.containers.dock_area.profile_utils import (
SETTINGS_KEYS,
default_profile_path,
get_profile_info,
@@ -31,28 +28,23 @@ from bec_widgets.widgets.containers.advanced_dock_area.profile_utils import (
load_user_profile_screenshot,
open_default_settings,
open_user_settings,
plugin_profiles_dir,
read_manifest,
restore_user_from_default,
set_quick_select,
user_profile_path,
write_manifest,
)
from bec_widgets.widgets.containers.advanced_dock_area.settings.dialogs import (
from bec_widgets.widgets.containers.dock_area.settings.dialogs import (
PreviewPanel,
RestoreProfileDialog,
)
from bec_widgets.widgets.containers.advanced_dock_area.settings.workspace_manager import (
WorkSpaceManager,
)
from .client_mocks import mocked_client
from bec_widgets.widgets.containers.dock_area.settings.workspace_manager import WorkSpaceManager
@pytest.fixture
def advanced_dock_area(qtbot, mocked_client):
"""Create an AdvancedDockArea instance for testing."""
widget = AdvancedDockArea(client=mocked_client)
widget = BECDockArea(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
@@ -152,7 +144,7 @@ def workspace_manager_target():
"""Mock delete_profile that performs actual file deletion."""
from qtpy.QtWidgets import QMessageBox
from bec_widgets.widgets.containers.advanced_dock_area.profile_utils import (
from bec_widgets.widgets.containers.dock_area.profile_utils import (
delete_profile_files,
is_profile_read_only,
)
@@ -190,7 +182,7 @@ def basic_dock_area(qtbot, mocked_client):
class _NamespaceProfiles:
"""Helper that routes profile file helpers through a namespace."""
def __init__(self, widget: AdvancedDockArea):
def __init__(self, widget: BECDockArea):
self.namespace = widget.profile_namespace
def open_user(self, name: str):
@@ -215,7 +207,7 @@ class _NamespaceProfiles:
return is_quick_select(name, namespace=self.namespace)
def profile_helper(widget: AdvancedDockArea) -> _NamespaceProfiles:
def profile_helper(widget: BECDockArea) -> _NamespaceProfiles:
"""Return a helper wired to the widget's profile namespace."""
return _NamespaceProfiles(widget)
@@ -590,7 +582,7 @@ class TestAdvancedDockAreaInit:
def test_init(self, advanced_dock_area):
assert advanced_dock_area is not None
assert isinstance(advanced_dock_area, AdvancedDockArea)
assert isinstance(advanced_dock_area, BECDockArea)
assert advanced_dock_area.mode == "creator"
assert hasattr(advanced_dock_area, "dock_manager")
assert hasattr(advanced_dock_area, "toolbar")
@@ -598,8 +590,8 @@ class TestAdvancedDockAreaInit:
assert hasattr(advanced_dock_area, "state_manager")
def test_rpc_and_plugin_flags(self):
assert AdvancedDockArea.RPC is True
assert AdvancedDockArea.PLUGIN is False
assert BECDockArea.RPC is True
assert BECDockArea.PLUGIN is False
def test_user_access_list(self):
expected_methods = [
@@ -611,7 +603,7 @@ class TestAdvancedDockAreaInit:
"delete_all",
]
for method in expected_methods:
assert method in AdvancedDockArea.USER_ACCESS
assert method in BECDockArea.USER_ACCESS
class TestDockManagement:
@@ -1421,21 +1413,21 @@ class TestAdvancedDockAreaRestoreAndDialogs:
pix = QPixmap(8, 8)
pix.fill(Qt.red)
monkeypatch.setattr(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.load_user_profile_screenshot",
"bec_widgets.widgets.containers.dock_area.dock_area.load_user_profile_screenshot",
lambda name, namespace=None: pix,
)
monkeypatch.setattr(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.load_default_profile_screenshot",
"bec_widgets.widgets.containers.dock_area.dock_area.load_default_profile_screenshot",
lambda name, namespace=None: pix,
)
monkeypatch.setattr(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.RestoreProfileDialog.confirm",
"bec_widgets.widgets.containers.dock_area.dock_area.RestoreProfileDialog.confirm",
lambda *args, **kwargs: True,
)
with (
patch(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.restore_user_from_default"
"bec_widgets.widgets.containers.dock_area.dock_area.restore_user_from_default"
) as mock_restore,
patch.object(advanced_dock_area, "delete_all") as mock_delete_all,
patch.object(advanced_dock_area, "load_profile") as mock_load_profile,
@@ -1457,20 +1449,20 @@ class TestAdvancedDockAreaRestoreAndDialogs:
advanced_dock_area._current_profile_name = profile_name
advanced_dock_area.isVisible = lambda: False
monkeypatch.setattr(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.load_user_profile_screenshot",
"bec_widgets.widgets.containers.dock_area.dock_area.load_user_profile_screenshot",
lambda name: QPixmap(),
)
monkeypatch.setattr(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.load_default_profile_screenshot",
"bec_widgets.widgets.containers.dock_area.dock_area.load_default_profile_screenshot",
lambda name: QPixmap(),
)
monkeypatch.setattr(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.RestoreProfileDialog.confirm",
"bec_widgets.widgets.containers.dock_area.dock_area.RestoreProfileDialog.confirm",
lambda *args, **kwargs: False,
)
with patch(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.restore_user_from_default"
"bec_widgets.widgets.containers.dock_area.dock_area.restore_user_from_default"
) as mock_restore:
advanced_dock_area.restore_user_profile_from_default()
@@ -1479,7 +1471,7 @@ class TestAdvancedDockAreaRestoreAndDialogs:
def test_restore_user_profile_from_default_no_target(self, advanced_dock_area, monkeypatch):
advanced_dock_area._current_profile_name = None
with patch(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.RestoreProfileDialog.confirm"
"bec_widgets.widgets.containers.dock_area.dock_area.RestoreProfileDialog.confirm"
) as mock_confirm:
advanced_dock_area.restore_user_profile_from_default()
mock_confirm.assert_not_called()
@@ -1723,8 +1715,7 @@ class TestWorkspaceProfileOperations:
return False
with patch(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.SaveProfileDialog",
StubDialog,
"bec_widgets.widgets.containers.dock_area.dock_area.SaveProfileDialog", StubDialog
):
advanced_dock_area.save_profile(profile_name, show_dialog=True)
@@ -1795,8 +1786,7 @@ class TestWorkspaceProfileOperations:
return False
with patch(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.SaveProfileDialog",
StubDialog,
"bec_widgets.widgets.containers.dock_area.dock_area.SaveProfileDialog", StubDialog
):
advanced_dock_area.save_profile(show_dialog=True)
@@ -1859,11 +1849,11 @@ class TestWorkspaceProfileOperations:
with (
patch(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.QMessageBox.question",
"bec_widgets.widgets.containers.dock_area.dock_area.QMessageBox.question",
return_value=QMessageBox.Yes,
) as mock_question,
patch(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.QMessageBox.information",
"bec_widgets.widgets.containers.dock_area.dock_area.QMessageBox.information",
return_value=None,
) as mock_info,
):
@@ -1893,7 +1883,7 @@ class TestWorkspaceProfileOperations:
mock_get_action.return_value.widget = mock_combo
with patch(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.QMessageBox.question"
"bec_widgets.widgets.containers.dock_area.dock_area.QMessageBox.question"
) as mock_question:
mock_question.return_value = QMessageBox.Yes
+1 -3
View File
@@ -1,14 +1,12 @@
import pytest
from bec_widgets.tests.utils import create_widget
from bec_widgets.utils.filter_io import FilterIO
from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit import (
DeviceLineEdit,
)
from bec_widgets.widgets.dap.dap_combo_box.dap_combo_box import DapComboBox
from .client_mocks import mocked_client
from .conftest import create_widget
@pytest.fixture(scope="function")
def dap_mock(qtbot, mocked_client):
-3
View File
@@ -16,9 +16,6 @@ from bec_widgets.widgets.plots.heatmap.heatmap import (
)
# pytest: disable=unused-import
from tests.unit_tests.client_mocks import mocked_client
from .client_mocks import create_dummy_scan_item
@pytest.fixture
-2
View File
@@ -9,8 +9,6 @@ from bec_widgets.utils.help_inspector.help_inspector import HelpInspector
from bec_widgets.utils.widget_io import WidgetHierarchy
from bec_widgets.widgets.control.buttons.button_abort.button_abort import AbortButton
from .client_mocks import mocked_client
@pytest.fixture
def help_inspector(qtbot, mocked_client):
-1
View File
@@ -4,7 +4,6 @@ import pyqtgraph as pg
import pytest
from bec_widgets.widgets.plots.image.image_base import ImageLayerManager
from bec_widgets.widgets.plots.image.image_item import ImageItem
@pytest.fixture()
+1 -2
View File
@@ -4,11 +4,10 @@ import numpy as np
import pytest
from qtpy.QtCore import QPointF, Qt
from bec_widgets.tests.utils import create_widget
from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree
from bec_widgets.widgets.plots.roi.image_roi import CircularROI, RectangularROI
from tests.unit_tests.client_mocks import mocked_client
from tests.unit_tests.conftest import create_widget
@pytest.fixture
+1 -2
View File
@@ -5,6 +5,7 @@ from typing import Literal
import numpy as np
import pytest
from bec_widgets.tests.utils import create_widget
from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.roi.image_roi import (
CircularROI,
@@ -12,8 +13,6 @@ from bec_widgets.widgets.plots.roi.image_roi import (
RectangularROI,
ROIController,
)
from tests.unit_tests.client_mocks import mocked_client
from tests.unit_tests.conftest import create_widget
@pytest.fixture(params=["rect", "circle", "ellipse"])
+305 -128
View File
@@ -1,17 +1,34 @@
import numpy as np
import pyqtgraph as pg
import pytest
from bec_lib.endpoints import MessageEndpoints
from qtpy.QtCore import QPointF
from bec_widgets.tests.utils import create_widget
from bec_widgets.widgets.plots.image.image import Image
from tests.unit_tests.client_mocks import mocked_client
from tests.unit_tests.conftest import create_widget
##################################################
# Image widget base functionality tests
##################################################
def _set_signal_config(
client,
device_name: str,
signal_name: str,
signal_class: str,
ndim: int,
obj_name: str | None = None,
):
device = client.device_manager.devices[device_name]
device._info["signals"][signal_name] = {
"obj_name": obj_name or signal_name,
"signal_class": signal_class,
"component_name": signal_name,
"describe": {"signal_info": {"ndim": ndim}},
}
def test_initialization_defaults(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
assert bec_image_view.color_map == "plasma"
@@ -114,32 +131,35 @@ def test_enable_colorbar_with_vrange(qtbot, mocked_client, colorbar_type):
##############################################
# Previewsignal update mechanism
# Device/signal update mechanism
def test_image_setup_preview_signal_1d(qtbot, mocked_client, monkeypatch):
def test_image_setup_preview_signal_1d(qtbot, mocked_client):
"""
Ensure that calling .image() with a (device, signal, config) tuple representing
a 1D PreviewSignal connects using the 1D path and updates correctly.
Ensure that calling .image() with a 1D PreviewSignal connects using the 1D path
and updates correctly.
"""
import numpy as np
view = create_widget(qtbot, Image, client=mocked_client)
signal_config = {
"obj_name": "waveform1d_img",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 1}},
}
_set_signal_config(
mocked_client,
"waveform1d",
"img",
signal_class="PreviewSignal",
ndim=1,
obj_name="waveform1d_img",
)
# Set the image monitor to the preview signal
view.image(monitor=("waveform1d", "img", signal_config))
view.image(device_name="waveform1d", device_entry="img")
# Subscriptions should indicate 1D preview connection
sub = view.subscriptions["main"]
assert sub.source == "device_monitor_1d"
assert sub.monitor_type == "1d"
assert sub.monitor == ("waveform1d", "img", signal_config)
assert view.device_name == "waveform1d"
assert view.device_entry == "img"
# Simulate a waveform update from the dispatcher
waveform = np.arange(25, dtype=float)
@@ -148,29 +168,32 @@ def test_image_setup_preview_signal_1d(qtbot, mocked_client, monkeypatch):
np.testing.assert_array_equal(view.main_image.raw_data[0], waveform)
def test_image_setup_preview_signal_2d(qtbot, mocked_client, monkeypatch):
def test_image_setup_preview_signal_2d(qtbot, mocked_client):
"""
Ensure that calling .image() with a (device, signal, config) tuple representing
a 2D PreviewSignal connects using the 2D path and updates correctly.
Ensure that calling .image() with a 2D PreviewSignal connects using the 2D path
and updates correctly.
"""
import numpy as np
view = create_widget(qtbot, Image, client=mocked_client)
signal_config = {
"obj_name": "eiger_img2d",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 2}},
}
_set_signal_config(
mocked_client,
"eiger",
"img2d",
signal_class="PreviewSignal",
ndim=2,
obj_name="eiger_img2d",
)
# Set the image monitor to the preview signal
view.image(monitor=("eiger", "img2d", signal_config))
view.image(device_name="eiger", device_entry="img2d")
# Subscriptions should indicate 2D preview connection
sub = view.subscriptions["main"]
assert sub.source == "device_monitor_2d"
assert sub.monitor_type == "2d"
assert sub.monitor == ("eiger", "img2d", signal_config)
assert view.device_name == "eiger"
assert view.device_entry == "img2d"
# Simulate a 2D image update
test_data = np.arange(16, dtype=float).reshape(4, 4)
@@ -178,38 +201,197 @@ def test_image_setup_preview_signal_2d(qtbot, mocked_client, monkeypatch):
np.testing.assert_array_equal(view.main_image.image, test_data)
def test_preview_signals_skip_0d_entries(qtbot, mocked_client, monkeypatch):
"""
Preview/async combobox should omit 0D signals.
"""
view = create_widget(qtbot, Image, client=mocked_client)
def fake_get(signal_class_filter):
signal_classes = (
signal_class_filter
if isinstance(signal_class_filter, (list, tuple, set))
else [signal_class_filter]
)
if "PreviewSignal" in signal_classes:
return [
(
"eiger",
"sig0d",
{
"obj_name": "sig0d",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 0}},
},
),
(
"eiger",
"sig2d",
{
"obj_name": "sig2d",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 2}},
},
),
]
return []
monkeypatch.setattr(view.client.device_manager, "get_bec_signals", fake_get)
device_selection = view.toolbar.components.get_action("device_selection").widget
device_selection.signal_combo_box.set_device("eiger")
device_selection.signal_combo_box.update_signals_from_signal_classes()
texts = [
device_selection.signal_combo_box.itemText(i)
for i in range(device_selection.signal_combo_box.count())
]
assert "sig0d" not in texts
assert "sig2d" in texts
def test_image_async_signal_uses_obj_name(qtbot, mocked_client, monkeypatch):
"""
Verify async signals use obj_name for endpoints/payloads and reconnect with scan_id.
"""
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(
mocked_client, "eiger", "img", signal_class="AsyncSignal", ndim=1, obj_name="async_obj"
)
view.image(device_name="eiger", device_entry="img")
assert view.subscriptions["main"].async_signal_name == "async_obj"
assert view.async_update is True
# Prepare scan ids and capture dispatcher calls
view.old_scan_id = "old_scan"
view.scan_id = "new_scan"
connected = []
disconnected = []
monkeypatch.setattr(
view.bec_dispatcher,
"connect_slot",
lambda slot, endpoint, from_start=False, cb_info=None: connected.append(
(slot, endpoint, from_start, cb_info)
),
)
monkeypatch.setattr(
view.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint: disconnected.append((slot, endpoint)),
)
view._setup_async_image(view.scan_id)
expected_new = MessageEndpoints.device_async_signal("new_scan", "eiger", "async_obj")
expected_old = MessageEndpoints.device_async_signal("old_scan", "eiger", "async_obj")
assert any(ep == expected_new for _, ep, _, _ in connected)
assert any(ep == expected_old for _, ep in disconnected)
# Payload extraction should use obj_name
payload = np.array([1, 2, 3])
msg = {"signals": {"async_obj": {"value": payload}}}
assert np.array_equal(view._get_payload_data(msg), payload)
def test_disconnect_clears_async_state(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(
mocked_client, "eiger", "img", signal_class="AsyncSignal", ndim=2, obj_name="async_obj"
)
view.image(device_name="eiger", device_entry="img")
view.scan_id = "scan_x"
view.old_scan_id = "scan_y"
view.subscriptions["main"].async_signal_name = "async_obj"
# Avoid touching real dispatcher
monkeypatch.setattr(view.bec_dispatcher, "disconnect_slot", lambda *args, **kwargs: None)
view.disconnect_monitor(device_name="eiger", device_entry="img")
assert view.subscriptions["main"].async_signal_name is None
assert view.async_update is False
##############################################
# Device monitor endpoint update mechanism
# Connection guardrails
def test_image_setup_image_2d(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.image(monitor="eiger", monitor_type="2d")
assert bec_image_view.monitor == "eiger"
assert bec_image_view.subscriptions["main"].source == "device_monitor_2d"
assert bec_image_view.subscriptions["main"].monitor_type == "2d"
assert bec_image_view.main_image.raw_data is None
assert bec_image_view.main_image.image is None
def test_image_setup_rejects_unsupported_signal_class(qtbot, mocked_client):
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(mocked_client, "eiger", "img", signal_class="Signal", ndim=2)
view.image(device_name="eiger", device_entry="img")
assert view.subscriptions["main"].source is None
assert view.subscriptions["main"].monitor_type is None
assert view.async_update is False
def test_image_setup_image_1d(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.image(monitor="eiger", monitor_type="1d")
assert bec_image_view.monitor == "eiger"
assert bec_image_view.subscriptions["main"].source == "device_monitor_1d"
assert bec_image_view.subscriptions["main"].monitor_type == "1d"
assert bec_image_view.main_image.raw_data is None
assert bec_image_view.main_image.image is None
def test_image_disconnects_with_missing_entry(qtbot, mocked_client):
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(mocked_client, "eiger", "img", signal_class="PreviewSignal", ndim=2)
view.image(device_name="eiger", device_entry="img")
assert view.device_name == "eiger"
assert view.device_entry == "img"
view.image(device_name="eiger", device_entry=None)
assert view.device_name == ""
assert view.device_entry == ""
def test_image_setup_image_auto(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.image(monitor="eiger", monitor_type="auto")
assert bec_image_view.monitor == "eiger"
assert bec_image_view.subscriptions["main"].source == "auto"
assert bec_image_view.subscriptions["main"].monitor_type == "auto"
assert bec_image_view.main_image.raw_data is None
assert bec_image_view.main_image.image is None
def test_handle_scan_change_clears_buffers_and_resets_crosshair(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
view.scan_id = "scan_1"
view.main_image.buffer = [np.array([1.0, 2.0])]
view.main_image.max_len = 2
clear_called = []
monkeypatch.setattr(view.main_image, "clear", lambda: clear_called.append(True))
reset_called = []
if view.crosshair is not None:
monkeypatch.setattr(view.crosshair, "reset", lambda: reset_called.append(True))
view._handle_scan_change("scan_2")
assert view.old_scan_id == "scan_1"
assert view.scan_id == "scan_2"
assert clear_called == [True]
assert view.main_image.buffer == []
assert view.main_image.max_len == 0
if view.crosshair is not None:
assert reset_called == [True]
def test_handle_scan_change_reconnects_async(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
view.scan_id = "scan_1"
view.async_update = True
called = []
monkeypatch.setattr(view, "_setup_async_image", lambda scan_id: called.append(scan_id))
view._handle_scan_change("scan_2")
assert called == ["scan_2"]
def test_handle_scan_change_same_scan_noop(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
view.scan_id = "scan_1"
view.main_image.buffer = [np.array([1.0])]
view.main_image.max_len = 1
clear_called = []
monkeypatch.setattr(view.main_image, "clear", lambda: clear_called.append(True))
view._handle_scan_change("scan_1")
assert view.scan_id == "scan_1"
assert clear_called == []
assert view.main_image.buffer == [np.array([1.0])]
assert view.main_image.max_len == 1
def test_image_data_update_2d(qtbot, mocked_client):
@@ -245,8 +427,7 @@ def test_toolbar_actions_presence(qtbot, mocked_client):
assert bec_image_view.toolbar.components.exists("image_autorange")
assert bec_image_view.toolbar.components.exists("lock_aspect_ratio")
assert bec_image_view.toolbar.components.exists("image_processing_fft")
assert bec_image_view.toolbar.components.exists("image_device_combo")
assert bec_image_view.toolbar.components.exists("image_dim_combo")
assert bec_image_view.toolbar.components.exists("device_selection")
def test_auto_emit_syncs_image_toolbar_actions(qtbot, mocked_client):
@@ -327,13 +508,40 @@ def test_setting_vrange_with_colorbar(qtbot, mocked_client, colorbar_type):
###################################
def test_setup_image_from_toolbar(qtbot, mocked_client):
def test_setup_image_from_toolbar(qtbot, mocked_client, monkeypatch):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.device_combo_box.setCurrentText("eiger")
bec_image_view.dim_combo_box.setCurrentText("2d")
_set_signal_config(mocked_client, "eiger", "img", signal_class="PreviewSignal", ndim=2)
monkeypatch.setattr(
mocked_client.device_manager,
"get_bec_signals",
lambda signal_class_filter: (
[
(
"eiger",
"img",
{
"obj_name": "img",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 2}},
},
)
]
if "PreviewSignal" in (signal_class_filter or [])
else []
),
)
assert bec_image_view.monitor == "eiger"
device_selection = bec_image_view.toolbar.components.get_action("device_selection").widget
device_selection.device_combo_box.update_devices_from_filters()
device_selection.device_combo_box.setCurrentText("eiger")
device_selection.signal_combo_box.setCurrentText("img")
bec_image_view.on_device_selection_changed(None)
qtbot.wait(200)
assert bec_image_view.device_name == "eiger"
assert bec_image_view.device_entry == "img"
assert bec_image_view.subscriptions["main"].source == "device_monitor_2d"
assert bec_image_view.subscriptions["main"].monitor_type == "2d"
assert bec_image_view.main_image.raw_data is None
@@ -598,90 +806,59 @@ def test_roi_plot_data_from_image(qtbot, mocked_client):
##############################################
# MonitorSelectionToolbarBundle specific tests
# Device selection toolbar sync
##############################################
def test_monitor_selection_reverse_device_items(qtbot, mocked_client):
"""
Verify that _reverse_device_items correctly reverses the order of items in the
device combobox while preserving the current selection.
"""
def test_device_selection_syncs_from_properties(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
combo = view.device_combo_box
# Replace existing items with a deterministic list
combo.clear()
combo.addItem("samx", 1)
combo.addItem("samy", 2)
combo.addItem("samz", 3)
combo.setCurrentText("samy")
# Reverse the items
view._reverse_device_items()
# Order should be reversed and selection preserved
assert [combo.itemText(i) for i in range(combo.count())] == ["samz", "samy", "samx"]
assert combo.currentText() == "samy"
def test_monitor_selection_populate_preview_signals(qtbot, mocked_client, monkeypatch):
"""
Verify that _populate_preview_signals adds previewsignal devices to the combobox
with the correct userData.
"""
view = create_widget(qtbot, Image, client=mocked_client)
# Provide a deterministic fake device_manager with get_bec_signals
class _FakeDM:
def get_bec_signals(self, _filter):
return [
("eiger", "img", {"obj_name": "eiger_img"}),
("async_device", "img2", {"obj_name": "async_device_img2"}),
_set_signal_config(mocked_client, "eiger", "img2d", signal_class="PreviewSignal", ndim=2)
monkeypatch.setattr(
view.client.device_manager,
"get_bec_signals",
lambda signal_class_filter: (
[
(
"eiger",
"img2d",
{
"obj_name": "img2d",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 2}},
},
)
]
if "PreviewSignal" in (signal_class_filter or [])
else []
),
)
monkeypatch.setattr(view.client, "device_manager", _FakeDM())
view.device_name = "eiger"
view.device_entry = "img2d"
initial_count = view.device_combo_box.count()
qtbot.wait(200) # Allow signal processing
view._populate_preview_signals()
# Two new entries should have been added
assert view.device_combo_box.count() == initial_count + 2
# The first newly added item should carry tuple userData describing the device/signal
data = view.device_combo_box.itemData(initial_count)
assert isinstance(data, tuple) and data[0] == "eiger"
device_selection = view.toolbar.components.get_action("device_selection").widget
qtbot.waitUntil(
lambda: device_selection.device_combo_box.currentText() == "eiger"
and device_selection.signal_combo_box.currentText() == "img2d",
timeout=1000,
)
def test_monitor_selection_adjust_and_connect(qtbot, mocked_client, monkeypatch):
"""
Verify that _adjust_and_connect performs the full set-up:
- fills the combobox with preview signals,
- reverses their order,
- and resets the currentText to an empty string.
"""
def test_device_entry_syncs_from_toolbar(qtbot, mocked_client):
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(mocked_client, "eiger", "img_a", signal_class="PreviewSignal", ndim=2)
_set_signal_config(mocked_client, "eiger", "img_b", signal_class="PreviewSignal", ndim=2)
# Deterministic fake device_manager
class _FakeDM:
def get_bec_signals(self, _filter):
return [("eiger", "img", {"obj_name": "eiger_img"})]
view.device_name = "eiger"
view.device_entry = "img_a"
monkeypatch.setattr(view.client, "device_manager", _FakeDM())
device_selection = view.toolbar.components.get_action("device_selection").widget
device_selection.signal_combo_box.blockSignals(True)
device_selection.signal_combo_box.setCurrentText("img_b")
device_selection.signal_combo_box.blockSignals(False)
combo = view.device_combo_box
# Start from a clean state
combo.clear()
combo.addItem("", None)
combo.setCurrentText("")
view._sync_device_entry_from_toolbar()
# Execute the method under test
view._adjust_and_connect()
# Expect exactly two items: preview label followed by the empty default
assert combo.count() == 2
# Because of the reversal, the preview label comes first
assert combo.itemText(0) == "eiger_img"
# Current selection remains empty
assert combo.currentText() == ""
assert view.device_entry == "img_b"
-2
View File
@@ -11,8 +11,6 @@ from bec_widgets.applications.launch_window import LaunchWindow
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
from .client_mocks import mocked_client
base_path = os.path.dirname(bec_widgets.__file__)
+1 -3
View File
@@ -3,11 +3,9 @@ from unittest import mock
import numpy as np
import pytest
from bec_widgets.tests.utils import create_widget
from bec_widgets.widgets.dap.lmfit_dialog.lmfit_dialog import LMFitDialog
from .client_mocks import mocked_client
from .conftest import create_widget
@pytest.fixture(scope="function")
def lmfit_dialog(qtbot, mocked_client):
-2
View File
@@ -18,8 +18,6 @@ from bec_widgets.widgets.utility.logpanel._util import (
)
from bec_widgets.widgets.utility.logpanel.logpanel import DEFAULT_LOG_COLORS, LogPanel
from .client_mocks import mocked_client
TEST_TABLE_STRING = "2025-01-15 15:57:18 | bec_server.scan_server.scan_queue | [INFO] | \n \x1b[3m primary queue / ScanQueueStatus.RUNNING \x1b[0m\n┏━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━┓\n\x1b[1m \x1b[0m\x1b[1m queue_id \x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mscan_id\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mis_scan\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mtype\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mscan_numb…\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mIQ status\x1b[0m\x1b[1m \x1b[0m┃\n┡━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━┩\n│ bbe50c82-6… │ None │ False │ mv │ None │ PENDING │\n└─────────────┴─────────┴─────────┴──────┴────────────┴───────────┘\n\n"
TEST_LOG_MESSAGES = [
-2
View File
@@ -4,8 +4,6 @@ from qtpy.QtWidgets import QWidget
from bec_widgets.applications.main_app import BECMainApp
from bec_widgets.applications.views.view import ViewBase
from .client_mocks import mocked_client
ANIM_TEST_DURATION = 60 # ms
+1 -3
View File
@@ -5,6 +5,7 @@ from qtpy.QtCore import QEvent, QPoint, QPointF
from qtpy.QtGui import QEnterEvent
from qtpy.QtWidgets import QApplication, QFrame, QLabel
from bec_widgets.tests.utils import create_widget
from bec_widgets.widgets.containers.main_window.addons.hover_widget import (
HoverWidget,
WidgetTooltip,
@@ -13,9 +14,6 @@ from bec_widgets.widgets.containers.main_window.addons.scroll_label import Scrol
from bec_widgets.widgets.containers.main_window.addons.web_links import BECWebLinksMixin
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
from .client_mocks import mocked_client
from .conftest import create_widget
@pytest.fixture
def bec_main_window(qtbot, mocked_client):
-2
View File
@@ -8,8 +8,6 @@ from qtpy.QtWidgets import QFileDialog, QMessageBox
from bec_widgets.widgets.editors.monaco.monaco_dock import MonacoDock
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget
from .client_mocks import mocked_client
@pytest.fixture
def monaco_dock(qtbot, mocked_client) -> Generator[MonacoDock, None, None]:
-1
View File
@@ -7,7 +7,6 @@ from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget
from bec_widgets.widgets.editors.monaco.scan_control_dialog import ScanControlDialog
from .client_mocks import mocked_client
from .test_scan_control import available_scans_message
+1 -3
View File
@@ -1,9 +1,7 @@
from qtpy.QtTest import QSignalSpy
from bec_widgets.tests.utils import create_widget
from bec_widgets.widgets.plots.motor_map.motor_map import MotorMap
from tests.unit_tests.client_mocks import mocked_client
from .conftest import create_widget
def test_motor_map_initialization(qtbot, mocked_client):
@@ -1,9 +1,7 @@
import numpy as np
from bec_widgets.tests.utils import create_widget
from bec_widgets.widgets.plots.multi_waveform.multi_waveform import MultiWaveform
from tests.unit_tests.client_mocks import mocked_client
from .conftest import create_widget
##################################################
# MultiWaveform widget base functionality tests
-2
View File
@@ -13,8 +13,6 @@ from bec_widgets.widgets.containers.main_window.addons.notification_center.notif
SeverityKind,
)
from .client_mocks import mocked_client
@pytest.fixture
def toast(qtbot):
-2
View File
@@ -4,8 +4,6 @@ from qtpy.QtPdfWidgets import QPdfView
from bec_widgets.widgets.utility.pdf_viewer.pdf_viewer import PdfViewerWidget
from .client_mocks import mocked_client
@pytest.fixture
def pdf_viewer_widget(qtbot, mocked_client):
+1 -3
View File
@@ -1,10 +1,8 @@
import numpy as np
from bec_widgets.tests.utils import create_widget
from bec_widgets.widgets.plots.plot_base import PlotBase, UIMode
from .client_mocks import mocked_client
from .conftest import create_widget
# pylint: disable=unused-import
# pylint: disable=missing-function-docstring
# pylint: disable=redefined-outer-name
+1 -4
View File
@@ -7,7 +7,7 @@ from qtpy.QtCore import Qt, QTimer
from qtpy.QtGui import QValidator
from qtpy.QtWidgets import QPushButton
from bec_widgets.tests.utils import Positioner
from bec_widgets.tests.utils import Positioner, create_widget
from bec_widgets.widgets.control.device_control.positioner_box import (
PositionerBox,
PositionerControlLine,
@@ -16,9 +16,6 @@ from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit
DeviceLineEdit,
)
from .client_mocks import mocked_client
from .conftest import create_widget
class PositionerWithoutPrecision(Positioner):
"""just placeholder for testing embedded isinstance check in DeviceCombobox"""
+1 -3
View File
@@ -2,11 +2,9 @@ from unittest import mock
import pytest
from bec_widgets.tests.utils import create_widget
from bec_widgets.widgets.control.device_control.positioner_box import PositionerBox2D
from .client_mocks import mocked_client
from .conftest import create_widget
@pytest.fixture
def positioner_box_2d(qtbot, mocked_client):
-2
View File
@@ -7,8 +7,6 @@ from qtpy.QtWidgets import QMessageBox
from bec_widgets.widgets.control.buttons.button_reset.button_reset import ResetButton
from .client_mocks import mocked_client
@pytest.fixture
def reset_button(qtbot, mocked_client):
-2
View File
@@ -4,8 +4,6 @@ import pytest
from bec_widgets.widgets.control.buttons.button_resume.button_resume import ResumeButton
from .client_mocks import mocked_client
@pytest.fixture
def resume_button(qtbot, mocked_client):
@@ -10,8 +10,6 @@ from qtpy.QtGui import QColor
from bec_widgets.utils import Colors
from bec_widgets.widgets.progress.ring_progress_bar.ring_progress_bar import RingProgressBar
from .client_mocks import mocked_client
@pytest.fixture
def ring_progress_bar(qtbot, mocked_client):
@@ -11,8 +11,6 @@ from bec_widgets.widgets.progress.ring_progress_bar.ring_progress_bar import (
RingProgressContainerWidget,
)
from .client_mocks import mocked_client
@pytest.fixture
def ring_container(qtbot, mocked_client):
@@ -3,7 +3,6 @@ import pytest
from bec_widgets.utils.settings_dialog import SettingsDialog
from bec_widgets.widgets.progress.ring_progress_bar.ring_progress_bar import RingProgressBar
from bec_widgets.widgets.progress.ring_progress_bar.ring_progress_settings_cards import RingSettings
from tests.unit_tests.client_mocks import mocked_client
@pytest.fixture
-2
View File
@@ -9,8 +9,6 @@ from bec_widgets.cli.server import GUIServer
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.rpc_server import RegistryNotReadyError, RPCServer, SingleshotRPCRepeat
from .client_mocks import mocked_client
class DummyWidget(BECConnector, QWidget):
def __init__(self, parent=None, client=None, **kwargs):
+1 -1
View File
@@ -9,7 +9,7 @@ def test_rpc_widget_handler():
handler = RPCWidgetHandler()
assert "Image" in handler.widget_classes
assert "RingProgressBar" in handler.widget_classes
assert "AdvancedDockArea" in handler.widget_classes
assert "BECDockArea" in handler.widget_classes
class _TestPluginWidget(BECWidget): ...
-2
View File
@@ -11,8 +11,6 @@ from bec_widgets.utils.forms_from_types.items import StrFormItem
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.control.scan_control import ScanControl
from .client_mocks import mocked_client
# pylint: disable=no-member
# pylint: disable=missing-function-docstring
# pylint: disable=redefined-outer-name
@@ -15,8 +15,6 @@ from bec_widgets.widgets.services.scan_history_browser.scan_history_browser impo
ScanHistoryBrowser,
)
from .client_mocks import mocked_client
@pytest.fixture
def scan_history_msg():
@@ -14,8 +14,6 @@ from bec_widgets.widgets.progress.scan_progressbar.scan_progressbar import (
ScanProgressBar,
)
from .client_mocks import mocked_client
@pytest.fixture
def scan_progressbar(qtbot, mocked_client):
+7 -7
View File
@@ -1,7 +1,8 @@
from unittest.mock import patch
from unittest.mock import MagicMock, patch
import numpy as np
from bec_widgets.tests.utils import create_widget
from bec_widgets.widgets.plots.scatter_waveform.scatter_curve import (
ScatterCurveConfig,
ScatterDeviceSignal,
@@ -10,9 +11,6 @@ from bec_widgets.widgets.plots.scatter_waveform.scatter_waveform import ScatterW
from bec_widgets.widgets.plots.scatter_waveform.settings.scatter_curve_setting import (
ScatterCurveSettings,
)
from tests.unit_tests.client_mocks import create_dummy_scan_item, mocked_client
from .conftest import create_widget
def test_waveform_initialization(qtbot, mocked_client):
@@ -53,14 +51,16 @@ def test_scatter_waveform_update_with_scan_history(qtbot, mocked_client, monkeyp
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
dummy_scan = create_dummy_scan_item()
mocked_client.history = MagicMock()
# .get_by_scan_id() typically returns historical data, but we abuse it here
# to return mock live data
mocked_client.history.get_by_scan_id.return_value = dummy_scan
mocked_client.history.__getitem__.return_value = dummy_scan
swf.plot("samx", "samy", "bpm4i", label="test_curve")
swf.update_with_scan_history(scan_id="dummy")
qtbot.wait(500)
assert swf.scan_item == dummy_scan
qtbot.waitUntil(lambda: swf.scan_item == dummy_scan, timeout=500)
qtbot.wait(200)
x_data, y_data = swf.main_curve.getData()
np.testing.assert_array_equal(x_data, [10, 20, 30])
+38
View File
@@ -0,0 +1,38 @@
from qtpy.QtCore import QRect
from qtpy.QtWidgets import QWidget
from bec_widgets.utils.screen_utils import (
apply_centered_size,
centered_geometry,
main_app_size_for_screen,
)
def test_centered_geometry_returns_expected_tuple():
available = QRect(100, 50, 800, 600)
result = centered_geometry(available, 400, 300)
assert result == (300, 200, 400, 300)
def test_main_app_size_for_screen_respects_16_9_and_screen_caps():
available = QRect(0, 0, 1920, 1080)
width, height = main_app_size_for_screen(available)
assert (width, height) == (1728, 972)
narrow = QRect(0, 0, 1000, 800)
width, height = main_app_size_for_screen(narrow)
assert (width, height) == (900, 506)
def test_apply_centered_size_uses_provided_geometry(qtbot):
widget = QWidget()
qtbot.addWidget(widget)
available = QRect(10, 20, 600, 400)
apply_centered_size(widget, 200, 100, available=available)
geometry = widget.geometry()
assert geometry.x() == 210
assert geometry.y() == 170
assert geometry.width() == 200
assert geometry.height() == 100
-2
View File
@@ -11,8 +11,6 @@ from bec_widgets.widgets.control.device_input.base_classes.device_signal_input_b
)
from bec_widgets.widgets.utility.signal_label.signal_label import ChoiceDialog, SignalLabel
from .client_mocks import mocked_client
SAMX_INFO_DICT = {
"signals": {
"readback": {
-2
View File
@@ -4,8 +4,6 @@ import pytest
from bec_widgets.widgets.control.buttons.stop_button.stop_button import StopButton
from .client_mocks import mocked_client
@pytest.fixture
def stop_button(qtbot, mocked_client):
-2
View File
@@ -2,8 +2,6 @@ import pytest
from bec_widgets.widgets.editors.text_box.text_box import DEFAULT_TEXT, TextBox
from .client_mocks import mocked_client
@pytest.fixture
def text_box_widget(qtbot, mocked_client):
@@ -3,12 +3,10 @@ from unittest import mock
import pyqtgraph as pg
import pytest
from bec_widgets.tests.utils import create_widget
from bec_widgets.utils.bec_signal_proxy import BECSignalProxy
from bec_widgets.widgets.dap.dap_combo_box.dap_combo_box import DapComboBox
from .client_mocks import mocked_client
from .conftest import create_widget
@pytest.fixture
def dap_combo_box(qtbot, mocked_client):

Some files were not shown because too many files have changed in this diff Show More