mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-04-19 23:05:36 +02:00
Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0717426db2 | ||
| f4af6ebc5f | |||
| a923f12c97 | |||
| a5a7607a83 | |||
| 9de548446b | |||
| 49ac7decf7 | |||
|
|
092bed38fa | ||
| 50c84a766a | |||
| d22a3317ba | |||
| 6df1d0c31f | |||
| 946752a4b0 | |||
| c1f62ad6cb | |||
| a5adf3a97d | |||
|
|
76e3e0b60f | ||
| f18eeb9c5d | |||
| 32ce8e2818 | |||
| 23413cffab | |||
|
|
4bbb8fa519 |
64
CHANGELOG.md
64
CHANGELOG.md
@@ -1,6 +1,70 @@
|
||||
# CHANGELOG
|
||||
|
||||
|
||||
## v2.34.0 (2025-08-07)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- Plugin widget import machinery
|
||||
([`9de5484`](https://github.com/bec-project/bec_widgets/commit/9de548446b9975c0f692757c66ffa07b9a849f15))
|
||||
|
||||
- lazy import client so plugin widgets can import BECWidgets which use it indirectly - exclude
|
||||
classes originating from bec_widgets core from plugin discovery - better errors
|
||||
|
||||
- Use better source for plugin repo name
|
||||
([`f4af6eb`](https://github.com/bec-project/bec_widgets/commit/f4af6ebc5fabf5b62ec87b580476d93d52690b08))
|
||||
|
||||
### Features
|
||||
|
||||
- Autoformat compiled file and add docs
|
||||
([`a923f12`](https://github.com/bec-project/bec_widgets/commit/a923f12c974192909222fcada9eca97325866d74))
|
||||
|
||||
- **plugin manager**: Add cli commands
|
||||
([`49ac7de`](https://github.com/bec-project/bec_widgets/commit/49ac7decf7d4cf461e6437f7285dc6967ee36d96))
|
||||
|
||||
|
||||
## v2.33.3 (2025-07-31)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- **scan-history-view**: Account for async loading of scan history
|
||||
([`6df1d0c`](https://github.com/bec-project/bec_widgets/commit/6df1d0c31fb58c25b01e95e2247277ff2dd5d00e))
|
||||
|
||||
### Refactoring
|
||||
|
||||
- Improve scan history performance on loading full scan lists
|
||||
([`a5adf3a`](https://github.com/bec-project/bec_widgets/commit/a5adf3a97d9ff05cef833445c1e6cd8f35a9a2fa))
|
||||
|
||||
- Make ids a set, cleanup
|
||||
([`c1f62ad`](https://github.com/bec-project/bec_widgets/commit/c1f62ad6cb00d9b392a8e0b6247f5260dfb37256))
|
||||
|
||||
- Use client callback for scan history reload
|
||||
([`d22a331`](https://github.com/bec-project/bec_widgets/commit/d22a3317baeccfcc4e074dcef4e3912301d210c5))
|
||||
|
||||
- **scan-history**: Add spinner for loading time of history
|
||||
([`50c84a7`](https://github.com/bec-project/bec_widgets/commit/50c84a766a2b021768fb2c0e8ee00b8e5f058ba7))
|
||||
|
||||
- **scan-history**: Fix insert logic; cleanup
|
||||
([`946752a`](https://github.com/bec-project/bec_widgets/commit/946752a4b05804c2f59cb5c21e4c1d11709a7d44))
|
||||
|
||||
|
||||
## v2.33.2 (2025-07-31)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- Delete choice dialog on close
|
||||
([`23413cf`](https://github.com/bec-project/bec_widgets/commit/23413cffabe721e35bb5bb726ec34d74dc4ffe05))
|
||||
|
||||
- Display short lists in SignalDisplay
|
||||
([`4bbb8fa`](https://github.com/bec-project/bec_widgets/commit/4bbb8fa519e8a90eebfcfa34e157493c9baa7880))
|
||||
|
||||
- Don't warn on empty DeviceEdit init
|
||||
([`f18eeb9`](https://github.com/bec-project/bec_widgets/commit/f18eeb9c5dccbd9348b6ee6d1477a8b7925d40fc))
|
||||
|
||||
- Remove config, directly set device+signal
|
||||
([`32ce8e2`](https://github.com/bec-project/bec_widgets/commit/32ce8e2818ceacda87e48399e3ed4df0cabb2335))
|
||||
|
||||
|
||||
## v2.33.1 (2025-07-31)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
@@ -4563,6 +4563,20 @@ class SignalLabel(RPCBase):
|
||||
Displays the full data from array signals if set to True.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def max_list_display_len(self) -> "int":
|
||||
"""
|
||||
For small lists, the max length to display
|
||||
"""
|
||||
|
||||
@max_list_display_len.setter
|
||||
@rpc_call
|
||||
def max_list_display_len(self) -> "int":
|
||||
"""
|
||||
For small lists, the max length to display
|
||||
"""
|
||||
|
||||
|
||||
class SignalLineEdit(RPCBase):
|
||||
"""Line edit widget for device input with autocomplete for device names."""
|
||||
|
||||
@@ -14,18 +14,21 @@ from typing import TYPE_CHECKING, Literal, TypeAlias, cast
|
||||
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.utils.import_utils import lazy_import_from
|
||||
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
|
||||
import bec_widgets.cli.client as client
|
||||
from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCReference
|
||||
from bec_widgets.utils.serialization import register_serializer_extension
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.messages import GUIRegistryStateMessage
|
||||
|
||||
import bec_widgets.cli.client as client
|
||||
else:
|
||||
GUIRegistryStateMessage = lazy_import_from("bec_lib.messages", "GUIRegistryStateMessage")
|
||||
client = lazy_import("bec_widgets.cli.client")
|
||||
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
@@ -38,9 +38,11 @@ def _loaded_submodules_from_specs(
|
||||
try:
|
||||
submodule.__loader__.exec_module(submodule)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error loading plugin {submodule}: \n{''.join(traceback.format_exception(e))}"
|
||||
)
|
||||
exception_text = "".join(traceback.format_exception(e))
|
||||
if "(most likely due to a circular import)" in exception_text:
|
||||
logger.warning(f"Circular import encountered while loading {submodule}")
|
||||
else:
|
||||
logger.error(f"Error loading plugin {submodule}: \n{exception_text}")
|
||||
yield submodule
|
||||
|
||||
|
||||
@@ -59,7 +61,8 @@ def _get_widgets_from_module(module: ModuleType) -> BECClassContainer:
|
||||
module,
|
||||
predicate=lambda item: inspect.isclass(item)
|
||||
and issubclass(item, BECWidget)
|
||||
and item is not BECWidget,
|
||||
and item is not BECWidget
|
||||
and not item.__module__.startswith("bec_widgets"),
|
||||
)
|
||||
return BECClassContainer(
|
||||
BECClassInfo(name=k, module=module.__name__, file=module.__loader__.get_filename(), obj=v)
|
||||
|
||||
0
bec_widgets/utils/bec_plugin_manager/__init__.py
Normal file
0
bec_widgets/utils/bec_plugin_manager/__init__.py
Normal file
86
bec_widgets/utils/bec_plugin_manager/create/widget.py
Normal file
86
bec_widgets/utils/bec_plugin_manager/create/widget.py
Normal file
@@ -0,0 +1,86 @@
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
from typing import Annotated
|
||||
|
||||
import copier
|
||||
import typer
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.plugin_helper import plugin_repo_path
|
||||
from bec_lib.utils.plugin_manager._constants import ANSWER_KEYS
|
||||
from bec_lib.utils.plugin_manager._util import existing_data, git_stage_files, make_commit
|
||||
|
||||
from bec_widgets.utils.bec_plugin_manager.edit_ui import open_and_watch_ui_editor
|
||||
|
||||
logger = bec_logger.logger
|
||||
_app = typer.Typer(rich_markup_mode="rich")
|
||||
|
||||
|
||||
def _commit_added_widget(repo: Path, name: str):
|
||||
git_stage_files(repo, [".copier-answers.yml"])
|
||||
git_stage_files(repo / repo.name / "bec_widgets" / "widgets" / name, [])
|
||||
make_commit(repo, f"plugin-manager added new widget: {name}")
|
||||
logger.info(f"Committing new widget {name}")
|
||||
|
||||
|
||||
def _widget_exists(widget_list: list[dict[str, str | bool]], name: str):
|
||||
return name in [w["name"] for w in widget_list]
|
||||
|
||||
|
||||
def _editor_cb(ctx: typer.Context, value: bool):
|
||||
if value and not ctx.params["use_ui"]:
|
||||
raise typer.BadParameter("Can only open the editor if creating a .ui file!")
|
||||
return value
|
||||
|
||||
|
||||
_bold_blue = "\033[34m\033[1m"
|
||||
_off = "\033[0m"
|
||||
_USE_UI_MSG = "Generate a .ui file for use in bec-designer."
|
||||
_OPEN_DESIGNER_MSG = f"""This app can watch for changes and recompile them to a python file imported to the widget whenever it is saved.
|
||||
To open this editor independently, you can use {_bold_blue}bec-plugin-manager edit-ui [widget_name]{_off}.
|
||||
Open the created widget .ui file in bec-designer now?"""
|
||||
|
||||
|
||||
@_app.command()
|
||||
def widget(
|
||||
name: Annotated[str, typer.Argument(help="Enter a name for your widget in snake_case")],
|
||||
use_ui: Annotated[bool, typer.Option(prompt=_USE_UI_MSG, help=_USE_UI_MSG)] = True,
|
||||
open_editor: Annotated[
|
||||
bool, typer.Option(prompt=_OPEN_DESIGNER_MSG, help=_OPEN_DESIGNER_MSG, callback=_editor_cb)
|
||||
] = True,
|
||||
):
|
||||
"""Create a new widget plugin with the given name.
|
||||
|
||||
If [bold white]use_ui[/bold white] is set, a bec-designer .ui file will also be created. If \
|
||||
[bold white]open_editor[/bold white] is additionally set, the .ui file will be opened in \
|
||||
bec-designer and the compiled python version will be updated when changes are made and saved."""
|
||||
if (formatted_name := name.lower().replace("-", "_")) != name:
|
||||
logger.warning(f"Adjusting widget name from {name} to {formatted_name}")
|
||||
if not formatted_name.isidentifier():
|
||||
logger.error(
|
||||
f"{name} is not a valid name for a widget (even after converting to {formatted_name}) - please enter something in snake_case"
|
||||
)
|
||||
exit(-1)
|
||||
logger.info(f"Adding new widget {formatted_name} to the template...")
|
||||
try:
|
||||
repo = Path(plugin_repo_path())
|
||||
plugin_data = existing_data(repo, [ANSWER_KEYS.VERSION, ANSWER_KEYS.WIDGETS])
|
||||
if _widget_exists(plugin_data[ANSWER_KEYS.WIDGETS], formatted_name):
|
||||
logger.error(f"Widget {formatted_name} already exists!")
|
||||
exit(-1)
|
||||
plugin_data[ANSWER_KEYS.WIDGETS].append({"name": formatted_name, "use_ui": use_ui})
|
||||
copier.run_update(
|
||||
repo,
|
||||
data=plugin_data,
|
||||
defaults=True,
|
||||
unsafe=True,
|
||||
overwrite=True,
|
||||
vcs_ref=plugin_data[ANSWER_KEYS.VERSION],
|
||||
)
|
||||
_commit_added_widget(repo, formatted_name)
|
||||
except Exception:
|
||||
logger.error(traceback.format_exc())
|
||||
logger.error("exiting...")
|
||||
exit(-1)
|
||||
logger.success(f"Added widget {formatted_name}!")
|
||||
if open_editor:
|
||||
open_and_watch_ui_editor(formatted_name)
|
||||
136
bec_widgets/utils/bec_plugin_manager/edit_ui.py
Normal file
136
bec_widgets/utils/bec_plugin_manager/edit_ui.py
Normal file
@@ -0,0 +1,136 @@
|
||||
import re
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.plugin_helper import plugin_package_name, plugin_repo_path
|
||||
from watchdog.events import (
|
||||
DirCreatedEvent,
|
||||
DirModifiedEvent,
|
||||
DirMovedEvent,
|
||||
FileCreatedEvent,
|
||||
FileModifiedEvent,
|
||||
FileMovedEvent,
|
||||
FileSystemEvent,
|
||||
FileSystemEventHandler,
|
||||
)
|
||||
from watchdog.observers import Observer
|
||||
|
||||
from bec_widgets.utils.bec_designer import open_designer
|
||||
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets
|
||||
from bec_widgets.utils.plugin_utils import get_custom_classes
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class RecompileHandler(FileSystemEventHandler):
|
||||
def __init__(self, in_file: Path, out_file: Path) -> None:
|
||||
super().__init__()
|
||||
self.in_file = str(in_file)
|
||||
self.out_file = str(out_file)
|
||||
self._pyside_import_re = re.compile(r"from PySide6\.(.*) import ")
|
||||
self._widget_import_re = re.compile(
|
||||
r"^from ([a-zA-Z_]*) import ([a-zA-Z_]*)$", re.MULTILINE
|
||||
)
|
||||
self._widget_modules = {
|
||||
c.name: c.module for c in (get_custom_classes("bec_widgets") + get_all_plugin_widgets())
|
||||
}
|
||||
|
||||
def on_created(self, event: DirCreatedEvent | FileCreatedEvent) -> None:
|
||||
self.recompile(event)
|
||||
|
||||
def on_modified(self, event: DirModifiedEvent | FileModifiedEvent) -> None:
|
||||
self.recompile(event)
|
||||
|
||||
def on_moved(self, event: DirMovedEvent | FileMovedEvent) -> None:
|
||||
self.recompile(event)
|
||||
|
||||
def recompile(self, event: FileSystemEvent) -> None:
|
||||
if event.src_path == self.in_file or event.dest_path == self.in_file:
|
||||
self._recompile()
|
||||
|
||||
def _recompile(self):
|
||||
logger.success(".ui file modified, recompiling...")
|
||||
code = subprocess.call(
|
||||
["pyside6-uic", "--absolute-imports", self.in_file, "-o", self.out_file]
|
||||
)
|
||||
logger.success(f"compilation exited with code {code}")
|
||||
if code != 0:
|
||||
return
|
||||
self._add_comment_to_file()
|
||||
logger.success("updating imports...")
|
||||
self._update_imports()
|
||||
logger.success("formatting...")
|
||||
code = subprocess.call(
|
||||
["black", "--line-length=100", "--skip-magic-trailing-comma", self.out_file]
|
||||
)
|
||||
if code != 0:
|
||||
logger.error(f"Error while running black on {self.out_file}, code: {code}")
|
||||
return
|
||||
code = subprocess.call(
|
||||
[
|
||||
"isort",
|
||||
"--line-length=100",
|
||||
"--profile=black",
|
||||
"--multi-line=3",
|
||||
"--trailing-comma",
|
||||
self.out_file,
|
||||
]
|
||||
)
|
||||
if code != 0:
|
||||
logger.error(f"Error while running isort on {self.out_file}, code: {code}")
|
||||
return
|
||||
logger.success("done!")
|
||||
|
||||
def _add_comment_to_file(self):
|
||||
with open(self.out_file, "r+") as f:
|
||||
initial = f.read()
|
||||
f.seek(0)
|
||||
f.write(f"# Generated from {self.in_file} by bec-plugin-manager - do not edit! \n")
|
||||
f.write(
|
||||
"# Use 'bec-plugin-manager edit-ui [widget_name]' to make changes, and this file will be updated accordingly. \n\n"
|
||||
)
|
||||
f.write(initial)
|
||||
|
||||
def _update_imports(self):
|
||||
with open(self.out_file, "r+") as f:
|
||||
initial = f.read()
|
||||
f.seek(0)
|
||||
qtpy_imports = re.sub(
|
||||
self._pyside_import_re, lambda ob: f"from qtpy.{ob.group(1)} import ", initial
|
||||
)
|
||||
print(self._widget_modules)
|
||||
print(re.findall(self._widget_import_re, qtpy_imports))
|
||||
widget_imports = re.sub(
|
||||
self._widget_import_re,
|
||||
lambda ob: (
|
||||
f"from {module} import {ob.group(2)}"
|
||||
if (module := self._widget_modules.get(ob.group(2))) is not None
|
||||
else ob.group(1)
|
||||
),
|
||||
qtpy_imports,
|
||||
)
|
||||
f.write(widget_imports)
|
||||
f.truncate()
|
||||
|
||||
|
||||
def open_and_watch_ui_editor(widget_name: str):
|
||||
logger.info(f"Opening the editor for {widget_name}, and watching")
|
||||
repo = Path(plugin_repo_path())
|
||||
widget_dir = repo / plugin_package_name() / "bec_widgets" / "widgets" / widget_name
|
||||
ui_file = widget_dir / f"{widget_name}.ui"
|
||||
ui_outfile = widget_dir / f"{widget_name}_ui.py"
|
||||
|
||||
logger.info(
|
||||
f"Opening the editor for {widget_name}, and watching {ui_file} for changes. Whenever you save the file, it will be recompiled to {ui_outfile}"
|
||||
)
|
||||
recompile_handler = RecompileHandler(ui_file, ui_outfile)
|
||||
observer = Observer()
|
||||
observer.schedule(recompile_handler, str(ui_file.parent))
|
||||
observer.start()
|
||||
try:
|
||||
open_designer([str(ui_file)])
|
||||
finally:
|
||||
observer.stop()
|
||||
observer.join()
|
||||
logger.info("Editing session ended, exiting...")
|
||||
@@ -112,7 +112,9 @@ class DeviceInputBase(BECWidget):
|
||||
WidgetIO.set_value(widget=self, value=device)
|
||||
self.config.default = device
|
||||
else:
|
||||
logger.warning(f"Device {device} is not in the filtered selection.")
|
||||
logger.warning(
|
||||
f"Device {device} is not in the filtered selection of {self}: {self.devices}."
|
||||
)
|
||||
|
||||
@SafeSlot()
|
||||
def update_devices_from_filters(self):
|
||||
@@ -131,7 +133,8 @@ class DeviceInputBase(BECWidget):
|
||||
# Filter based on readout priority
|
||||
devs = [dev for dev in devs if self._check_readout_filter(dev)]
|
||||
self.devices = [device.name for device in devs]
|
||||
self.set_device(current_device)
|
||||
if current_device != "":
|
||||
self.set_device(current_device)
|
||||
|
||||
@SafeSlot(list)
|
||||
def set_available_devices(self, devices: list[str]):
|
||||
|
||||
@@ -5,11 +5,13 @@ from typing import TYPE_CHECKING
|
||||
from bec_lib.callback_handler import EventType
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.messages import ScanHistoryMessage
|
||||
from bec_qthemes import material_icon
|
||||
from qtpy import QtCore, QtGui, QtWidgets
|
||||
|
||||
from bec_widgets.utils.bec_widget import BECWidget, ConnectionConfig
|
||||
from bec_widgets.utils.colors import get_accent_colors
|
||||
from bec_widgets.utils.error_popups import SafeSlot
|
||||
from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_lib.client import BECClient
|
||||
@@ -25,22 +27,38 @@ class BECHistoryManager(QtCore.QObject):
|
||||
|
||||
# ScanHistoryMessage.model_dump() (dict)
|
||||
scan_history_updated = QtCore.Signal(dict)
|
||||
scan_history_refreshed = QtCore.Signal(list)
|
||||
|
||||
def __init__(self, parent, client: BECClient):
|
||||
super().__init__(parent)
|
||||
self._load_attempt = 0
|
||||
self.client = client
|
||||
self._cb_id = self.client.callbacks.register(
|
||||
event_type=EventType.SCAN_HISTORY_UPDATE, callback=self._on_scan_history_update
|
||||
self._cb_id: dict[str, int] = {}
|
||||
self._cb_id["update_scan_history"] = self.client.callbacks.register(
|
||||
EventType.SCAN_HISTORY_UPDATE, self._on_scan_history_update
|
||||
)
|
||||
self._cb_id["scan_history_loaded"] = self.client.callbacks.register(
|
||||
EventType.SCAN_HISTORY_LOADED, self._on_scan_history_reloaded
|
||||
)
|
||||
|
||||
def refresh_scan_history(self) -> None:
|
||||
"""Refresh the scan history from the client."""
|
||||
all_messages = []
|
||||
# pylint: disable=protected-access
|
||||
for scan_id in self.client.history._scan_ids: # pylint: disable=protected-access
|
||||
history_msg = self.client.history._scan_data.get(scan_id, None)
|
||||
if history_msg is None:
|
||||
logger.info(f"Scan history message for scan_id {scan_id} not found.")
|
||||
continue
|
||||
self.scan_history_updated.emit(history_msg.model_dump())
|
||||
all_messages.append(history_msg.model_dump())
|
||||
self.scan_history_refreshed.emit(all_messages)
|
||||
|
||||
def _on_scan_history_reloaded(self, history_msgs: list[ScanHistoryMessage]) -> None:
|
||||
"""Handle scan history reloaded event from the client."""
|
||||
if not history_msgs:
|
||||
logger.warning("Scan history reloaded with no messages.")
|
||||
return
|
||||
self.scan_history_refreshed.emit([msg.model_dump() for msg in history_msgs])
|
||||
|
||||
def _on_scan_history_update(self, history_msg: ScanHistoryMessage) -> None:
|
||||
"""Handle scan history updates from the client."""
|
||||
@@ -48,8 +66,10 @@ class BECHistoryManager(QtCore.QObject):
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""Clean up the manager by disconnecting callbacks."""
|
||||
self.client.callbacks.remove(self._cb_id)
|
||||
for cb_id in self._cb_id.values():
|
||||
self.client.callbacks.remove(cb_id)
|
||||
self.scan_history_updated.disconnect()
|
||||
self.scan_history_refreshed.disconnect()
|
||||
|
||||
|
||||
class ScanHistoryView(BECWidget, QtWidgets.QTreeWidget):
|
||||
@@ -80,15 +100,10 @@ class ScanHistoryView(BECWidget, QtWidgets.QTreeWidget):
|
||||
theme_update=theme_update,
|
||||
**kwargs,
|
||||
)
|
||||
colors = get_accent_colors()
|
||||
self.status_colors = {
|
||||
"closed": colors.success,
|
||||
"halted": colors.warning,
|
||||
"aborted": colors.emergency,
|
||||
}
|
||||
# self.status_colors = {"closed": "#00e676", "halted": "#ffca28", "aborted": "#ff5252"}
|
||||
self.status_icons = self._create_status_icons()
|
||||
self.column_header = ["Scan Nr", "Scan Name", "Status"]
|
||||
self.scan_history: list[ScanHistoryMessage] = [] # newest at index 0
|
||||
self.scan_history_ids: set[str] = set() # scan IDs of the scan history
|
||||
self.max_length = max_length # Maximum number of scan history entries to keep
|
||||
self.bec_scan_history_manager = BECHistoryManager(parent=self, client=self.client)
|
||||
self._set_policies()
|
||||
@@ -97,6 +112,12 @@ class ScanHistoryView(BECWidget, QtWidgets.QTreeWidget):
|
||||
header = self.header()
|
||||
header.setToolTip(f"Last {self.max_length} scans in history.")
|
||||
self.bec_scan_history_manager.scan_history_updated.connect(self.update_history)
|
||||
self.bec_scan_history_manager.scan_history_refreshed.connect(self.update_full_history)
|
||||
self._container = QtWidgets.QStackedLayout()
|
||||
self._container.setStackingMode(QtWidgets.QStackedLayout.StackAll)
|
||||
self.setLayout(self._container)
|
||||
self._add_overlay()
|
||||
self._start_waiting_display()
|
||||
self.refresh()
|
||||
|
||||
def _set_policies(self):
|
||||
@@ -117,16 +138,52 @@ class ScanHistoryView(BECWidget, QtWidgets.QTreeWidget):
|
||||
for column in range(1, self.columnCount()):
|
||||
header.setSectionResizeMode(column, QtWidgets.QHeaderView.ResizeMode.Stretch)
|
||||
|
||||
def _create_status_icons(self) -> dict[str, QtGui.QIcon]:
|
||||
"""Create status icons for the scan history."""
|
||||
colors = get_accent_colors()
|
||||
return {
|
||||
"closed": material_icon(
|
||||
icon_name="fiber_manual_record", filled=True, color=colors.success
|
||||
),
|
||||
"halted": material_icon(
|
||||
icon_name="fiber_manual_record", filled=True, color=colors.warning
|
||||
),
|
||||
"aborted": material_icon(
|
||||
icon_name="fiber_manual_record", filled=True, color=colors.emergency
|
||||
),
|
||||
"unknown": material_icon(
|
||||
icon_name="fiber_manual_record", filled=True, color=QtGui.QColor("#b0bec5")
|
||||
),
|
||||
}
|
||||
|
||||
def apply_theme(self, theme: str | None = None):
|
||||
"""Apply the theme to the widget."""
|
||||
colors = get_accent_colors()
|
||||
self.status_colors = {
|
||||
"closed": colors.success,
|
||||
"halted": colors.warning,
|
||||
"aborted": colors.emergency,
|
||||
}
|
||||
self.status_icons = self._create_status_icons()
|
||||
self.repaint()
|
||||
|
||||
def _add_overlay(self):
|
||||
self._overlay_widget = QtWidgets.QWidget()
|
||||
self._overlay_widget.setStyleSheet("background-color: rgba(240, 240, 240, 180);")
|
||||
self._overlay_widget.setAutoFillBackground(True)
|
||||
self._overlay_layout = QtWidgets.QVBoxLayout()
|
||||
self._overlay_layout.setAlignment(QtCore.Qt.AlignmentFlag.AlignCenter)
|
||||
self._overlay_widget.setLayout(self._overlay_layout)
|
||||
|
||||
self._spinner = SpinnerWidget(parent=self)
|
||||
self._spinner.setFixedSize(QtCore.QSize(32, 32))
|
||||
self._overlay_layout.addWidget(self._spinner)
|
||||
self._container.addWidget(self._overlay_widget)
|
||||
|
||||
def _start_waiting_display(self):
|
||||
self._overlay_widget.setVisible(True)
|
||||
self._spinner.start()
|
||||
QtWidgets.QApplication.processEvents()
|
||||
|
||||
def _stop_waiting_display(self):
|
||||
self._overlay_widget.setVisible(False)
|
||||
self._spinner.stop()
|
||||
QtWidgets.QApplication.processEvents()
|
||||
|
||||
def _current_item_changed(
|
||||
self, current: QtWidgets.QTreeWidgetItem, previous: QtWidgets.QTreeWidgetItem
|
||||
):
|
||||
@@ -145,9 +202,14 @@ class ScanHistoryView(BECWidget, QtWidgets.QTreeWidget):
|
||||
@SafeSlot()
|
||||
def refresh(self):
|
||||
"""Refresh the scan history view."""
|
||||
while len(self.scan_history) > 0:
|
||||
self.remove_scan(index=0)
|
||||
self.bec_scan_history_manager.refresh_scan_history()
|
||||
# pylint: disable=protected-access
|
||||
if self.client.history._scan_history_loaded_event.is_set():
|
||||
while len(self.scan_history) > 0:
|
||||
self.remove_scan(index=0)
|
||||
self.bec_scan_history_manager.refresh_scan_history()
|
||||
return
|
||||
else:
|
||||
logger.info("Scan history not loaded yet, waiting for it to be loaded.")
|
||||
|
||||
@SafeSlot(dict)
|
||||
def update_history(self, msg_dump: dict):
|
||||
@@ -156,6 +218,20 @@ class ScanHistoryView(BECWidget, QtWidgets.QTreeWidget):
|
||||
self.add_scan(msg)
|
||||
self.ensure_history_max_length()
|
||||
|
||||
@SafeSlot(list)
|
||||
def update_full_history(self, all_messages: list[dict]):
|
||||
"""Update the scan history with a full list of scan data."""
|
||||
messages = []
|
||||
for msg_dump in all_messages:
|
||||
msg = ScanHistoryMessage(**msg_dump)
|
||||
messages.append(msg)
|
||||
if len(messages) >= self.max_length:
|
||||
messages.pop(0)
|
||||
messages.sort(key=lambda m: m.scan_number, reverse=False)
|
||||
self.add_scans(messages)
|
||||
self.ensure_history_max_length()
|
||||
self._stop_waiting_display()
|
||||
|
||||
def ensure_history_max_length(self) -> None:
|
||||
"""
|
||||
Method to ensure the scan history does not exceed the maximum length.
|
||||
@@ -172,6 +248,34 @@ class ScanHistoryView(BECWidget, QtWidgets.QTreeWidget):
|
||||
"""
|
||||
Add a scan entry to the tree widget.
|
||||
|
||||
Args:
|
||||
msg (ScanHistoryMessage): The scan history message containing scan details.
|
||||
"""
|
||||
self._add_scan_to_scan_history(msg)
|
||||
tree_item = self._setup_tree_item(msg)
|
||||
self.insertTopLevelItem(0, tree_item)
|
||||
|
||||
def _setup_tree_item(self, msg: ScanHistoryMessage) -> QtWidgets.QTreeWidgetItem:
|
||||
"""Setup a tree item for the scan history message.
|
||||
|
||||
Args:
|
||||
msg (ScanHistoryMessage): The scan history message containing scan details.
|
||||
|
||||
Returns:
|
||||
QtWidgets.QTreeWidgetItem: The tree item representing the scan history message.
|
||||
"""
|
||||
tree_item = QtWidgets.QTreeWidgetItem([str(msg.scan_number), msg.scan_name, ""])
|
||||
icon = self.status_icons.get(msg.exit_status, self.status_icons["unknown"])
|
||||
tree_item.setIcon(2, icon)
|
||||
tree_item.setExpanded(False)
|
||||
for col in range(tree_item.columnCount()):
|
||||
tree_item.setToolTip(col, f"Status: {msg.exit_status}")
|
||||
return tree_item
|
||||
|
||||
def _add_scan_to_scan_history(self, msg: ScanHistoryMessage):
|
||||
"""
|
||||
Add a scan message to the internal scan history list and update the tree widget.
|
||||
|
||||
Args:
|
||||
msg (ScanHistoryMessage): The scan history message containing scan details.
|
||||
"""
|
||||
@@ -180,25 +284,25 @@ class ScanHistoryView(BECWidget, QtWidgets.QTreeWidget):
|
||||
f"Old scan history entry fo scan {msg.scan_id} without stored_data_info, skipping."
|
||||
)
|
||||
return
|
||||
if msg in self.scan_history:
|
||||
if msg.scan_id in self.scan_history_ids:
|
||||
logger.info(f"Scan {msg.scan_id} already in history, skipping.")
|
||||
return
|
||||
self.scan_history.insert(0, msg)
|
||||
tree_item = QtWidgets.QTreeWidgetItem([str(msg.scan_number), msg.scan_name, ""])
|
||||
color = QtGui.QColor(self.status_colors.get(msg.exit_status, "#b0bec5"))
|
||||
pix = QtGui.QPixmap(10, 10)
|
||||
pix.fill(QtCore.Qt.transparent)
|
||||
with QtGui.QPainter(pix) as p:
|
||||
p.setRenderHint(QtGui.QPainter.Antialiasing)
|
||||
p.setPen(QtCore.Qt.NoPen)
|
||||
p.setBrush(color)
|
||||
p.drawEllipse(0, 0, 10, 10)
|
||||
tree_item.setIcon(2, QtGui.QIcon(pix))
|
||||
tree_item.setForeground(2, QtGui.QBrush(color))
|
||||
for col in range(tree_item.columnCount()):
|
||||
tree_item.setToolTip(col, f"Status: {msg.exit_status}")
|
||||
self.insertTopLevelItem(0, tree_item)
|
||||
tree_item.setExpanded(False)
|
||||
self.scan_history_ids.add(msg.scan_id)
|
||||
|
||||
def add_scans(self, messages: list[ScanHistoryMessage]):
|
||||
"""
|
||||
Add multiple scan entries to the tree widget.
|
||||
|
||||
Args:
|
||||
messages (list[ScanHistoryMessage]): List of scan history messages containing scan details.
|
||||
"""
|
||||
tree_items = []
|
||||
for msg in messages:
|
||||
self._add_scan_to_scan_history(msg)
|
||||
tree_items.append(self._setup_tree_item(msg))
|
||||
# Insert for insertTopLevelItems needs to reversed to keep order of scan_history list
|
||||
self.insertTopLevelItems(0, tree_items[::-1])
|
||||
|
||||
def remove_scan(self, index: int):
|
||||
"""
|
||||
@@ -212,6 +316,7 @@ class ScanHistoryView(BECWidget, QtWidgets.QTreeWidget):
|
||||
index = len(self.scan_history) + index
|
||||
try:
|
||||
msg = self.scan_history.pop(index)
|
||||
self.scan_history_ids.remove(msg.scan_id)
|
||||
self.no_scan_selected.emit()
|
||||
except IndexError:
|
||||
logger.warning(f"Invalid index {index} for removing scan entry from history.")
|
||||
|
||||
@@ -2,11 +2,13 @@ from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import traceback
|
||||
from typing import TYPE_CHECKING, Any
|
||||
from typing import TYPE_CHECKING, Any, Sequence
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.device import Device, Signal
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_qthemes import material_icon
|
||||
from qtpy.QtCore import Qt
|
||||
from qtpy.QtCore import Signal as QSignal
|
||||
from qtpy.QtWidgets import (
|
||||
QApplication,
|
||||
@@ -20,17 +22,10 @@ from qtpy.QtWidgets import (
|
||||
QWidget,
|
||||
)
|
||||
|
||||
from bec_widgets.utils.bec_connector import ConnectionConfig
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
from bec_widgets.utils.colors import get_accent_colors
|
||||
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
|
||||
from bec_widgets.utils.ophyd_kind_util import Kind
|
||||
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import (
|
||||
DeviceInputConfig,
|
||||
)
|
||||
from bec_widgets.widgets.control.device_input.base_classes.device_signal_input_base import (
|
||||
DeviceSignalInputBaseConfig,
|
||||
)
|
||||
from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit import (
|
||||
DeviceLineEdit,
|
||||
)
|
||||
@@ -48,8 +43,9 @@ class ChoiceDialog(QDialog):
|
||||
def __init__(
|
||||
self,
|
||||
parent: QWidget | None = None,
|
||||
config: ConnectionConfig | None = None,
|
||||
client: BECClient | None = None,
|
||||
device: str | None = None,
|
||||
signal: str | None = None,
|
||||
show_hinted: bool = True,
|
||||
show_normal: bool = False,
|
||||
show_config: bool = False,
|
||||
@@ -63,18 +59,8 @@ class ChoiceDialog(QDialog):
|
||||
|
||||
layout = QHBoxLayout()
|
||||
|
||||
config_dict = config.model_dump() if config is not None else {}
|
||||
self._device_config = DeviceInputConfig.model_validate(config_dict)
|
||||
self._signal_config = DeviceSignalInputBaseConfig.model_validate(config_dict)
|
||||
self._device_field = DeviceLineEdit(
|
||||
config=self._device_config, parent=parent, client=client
|
||||
)
|
||||
self._signal_field = SignalComboBox(
|
||||
config=self._signal_config,
|
||||
device=self._signal_config.device,
|
||||
parent=parent,
|
||||
client=client,
|
||||
)
|
||||
self._device_field = DeviceLineEdit(parent=parent, client=client)
|
||||
self._signal_field = SignalComboBox(parent=parent, client=client)
|
||||
layout.addWidget(self._device_field)
|
||||
layout.addWidget(self._signal_field)
|
||||
|
||||
@@ -89,7 +75,10 @@ class ChoiceDialog(QDialog):
|
||||
|
||||
self.setLayout(layout)
|
||||
self._device_field.textChanged.connect(self._update_device)
|
||||
self._device_field.setText(config.device if config is not None else "")
|
||||
if device:
|
||||
self._device_field.set_device(device)
|
||||
if signal and signal in set(s[0] for s in self._signal_field.signals):
|
||||
self._signal_field.set_signal(signal)
|
||||
|
||||
def _display_error(self):
|
||||
try:
|
||||
@@ -123,11 +112,19 @@ class ChoiceDialog(QDialog):
|
||||
self.accepted_output.emit(
|
||||
self._device_field.text(), self._signal_field.selected_signal_comp_name
|
||||
)
|
||||
self.cleanup()
|
||||
return super().accept()
|
||||
|
||||
def reject(self):
|
||||
self.cleanup()
|
||||
return super().reject()
|
||||
|
||||
def cleanup(self):
|
||||
self._device_field.close()
|
||||
self._signal_field.close()
|
||||
|
||||
|
||||
class SignalLabel(BECWidget, QWidget):
|
||||
|
||||
ICON_NAME = "scoreboard"
|
||||
RPC = True
|
||||
PLUGIN = True
|
||||
@@ -151,6 +148,8 @@ class SignalLabel(BECWidget, QWidget):
|
||||
"show_config_signals.setter",
|
||||
"display_array_data",
|
||||
"display_array_data.setter",
|
||||
"max_list_display_len",
|
||||
"max_list_display_len.setter",
|
||||
]
|
||||
|
||||
def __init__(
|
||||
@@ -178,7 +177,6 @@ class SignalLabel(BECWidget, QWidget):
|
||||
custom_label (str, optional): Custom label for the widget. Defaults to "".
|
||||
custom_units (str, optional): Custom units for the widget. Defaults to "".
|
||||
"""
|
||||
self._config = DeviceSignalInputBaseConfig(default=signal, device=device)
|
||||
super().__init__(parent=parent, client=client, **kwargs)
|
||||
|
||||
self._device = device
|
||||
@@ -189,6 +187,7 @@ class SignalLabel(BECWidget, QWidget):
|
||||
self._show_default_units: bool = show_default_units
|
||||
self._decimal_places = 3
|
||||
self._dtype = None
|
||||
self._max_list_display_len = 5
|
||||
|
||||
self._show_hinted_signals: bool = True
|
||||
self._show_normal_signals: bool = True
|
||||
@@ -227,9 +226,10 @@ class SignalLabel(BECWidget, QWidget):
|
||||
|
||||
def _create_dialog(self):
|
||||
return ChoiceDialog(
|
||||
config=self._config,
|
||||
parent=self,
|
||||
client=self.client,
|
||||
device=self.device,
|
||||
signal=self._signal_key,
|
||||
show_config=self.show_config_signals,
|
||||
show_normal=self.show_normal_signals,
|
||||
show_hinted=self.show_hinted_signals,
|
||||
@@ -280,7 +280,7 @@ class SignalLabel(BECWidget, QWidget):
|
||||
return
|
||||
self._value = value
|
||||
self._units = self._signal_info.get("egu", "")
|
||||
self._dtype = self._signal_info.get("dtype", "float")
|
||||
self._dtype = self._signal_info.get("dtype")
|
||||
|
||||
@SafeSlot(dict, dict)
|
||||
def on_device_readback(self, msg: dict, metadata: dict) -> None:
|
||||
@@ -305,11 +305,13 @@ class SignalLabel(BECWidget, QWidget):
|
||||
except KeyError:
|
||||
return "", {}
|
||||
if signal_info["kind_str"] == Kind.hinted.name:
|
||||
return signal_info["obj_name"], signal_info
|
||||
return signal_info["obj_name"], signal_info.get("describe", {})
|
||||
else:
|
||||
return f"{self._device}_{self._signal}", signal_info
|
||||
return f"{self._device}_{self._signal}", signal_info.get("describe", {})
|
||||
elif isinstance(self._device_obj, Signal):
|
||||
return self._device, self._device_obj._info["describe_configuration"]
|
||||
info = self._device_obj._info["describe_configuration"][self._device]
|
||||
info["egu"] = self._device_obj._info["describe_configuration"].get("egu")
|
||||
return (self._device, info)
|
||||
return "", {}
|
||||
|
||||
@SafeProperty(str)
|
||||
@@ -322,7 +324,6 @@ class SignalLabel(BECWidget, QWidget):
|
||||
self.disconnect_device()
|
||||
self._device = value
|
||||
self._device_obj = self.dev.get(self._device)
|
||||
self._config.device = value
|
||||
self.connect_device()
|
||||
self._update_label()
|
||||
|
||||
@@ -335,7 +336,6 @@ class SignalLabel(BECWidget, QWidget):
|
||||
def signal(self, value: str) -> None:
|
||||
self.disconnect_device()
|
||||
self._signal = value
|
||||
self._config.default = value
|
||||
self.connect_device()
|
||||
self._update_label()
|
||||
|
||||
@@ -369,6 +369,16 @@ class SignalLabel(BECWidget, QWidget):
|
||||
self._custom_label = value
|
||||
self._update_label()
|
||||
|
||||
@SafeProperty(str)
|
||||
def max_list_display_len(self) -> int:
|
||||
"""For small lists, the max length to display"""
|
||||
return self._max_list_display_len
|
||||
|
||||
@max_list_display_len.setter
|
||||
def max_list_display_len(self, value: int) -> None:
|
||||
self._max_list_display_len = value
|
||||
self.set_display_value(self._value)
|
||||
|
||||
@SafeProperty(str)
|
||||
def custom_units(self) -> str:
|
||||
"""Use a custom unit string"""
|
||||
@@ -429,6 +439,11 @@ class SignalLabel(BECWidget, QWidget):
|
||||
def _format_value(self, value: Any):
|
||||
if self._dtype == "array" and not self.display_array_data:
|
||||
return "ARRAY DATA"
|
||||
if not isinstance(value, str) and isinstance(value, (Sequence, np.ndarray)):
|
||||
if len(value) < self._max_list_display_len:
|
||||
return str(value)
|
||||
else:
|
||||
return "ARRAY DATA"
|
||||
if self._decimal_places == 0:
|
||||
return value
|
||||
try:
|
||||
@@ -468,7 +483,6 @@ class SignalLabel(BECWidget, QWidget):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
w = QWidget()
|
||||
w.setLayout(QVBoxLayout())
|
||||
|
||||
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "bec_widgets"
|
||||
version = "2.33.1"
|
||||
version = "2.34.0"
|
||||
description = "BEC Widgets"
|
||||
requires-python = ">=3.10"
|
||||
classifiers = [
|
||||
@@ -39,6 +39,8 @@ dev = [
|
||||
"pytest-xvfb~=3.0",
|
||||
"pytest~=8.0",
|
||||
"pytest-cov~=6.1.1",
|
||||
"watchdog~=6.0",
|
||||
"pre_commit~=4.2",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
|
||||
255
tests/unit_tests/test_plugin_creator.py
Normal file
255
tests/unit_tests/test_plugin_creator.py
Normal file
@@ -0,0 +1,255 @@
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
from pathlib import Path
|
||||
from time import sleep
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, call, patch
|
||||
|
||||
import copier
|
||||
import pytest
|
||||
from bec_lib.utils.plugin_manager import main
|
||||
from bec_lib.utils.plugin_manager._util import _goto_dir
|
||||
from typer.testing import CliRunner
|
||||
|
||||
from bec_widgets.utils.bec_plugin_manager.create.widget import _commit_added_widget, _widget_exists
|
||||
|
||||
PLUGIN_REPO = "https://github.com/bec-project/plugin_copier_template.git"
|
||||
|
||||
REPLACEMENT_UI_CONTENTS = """<?xml version="1.0" encoding="UTF-8"?>
|
||||
<ui version="4.0">
|
||||
<class>testWidget6</class>
|
||||
<widget class="QWidget" name="testWidget6">
|
||||
<property name="geometry">
|
||||
<rect>
|
||||
<x>0</x>
|
||||
<y>0</y>
|
||||
<width>539</width>
|
||||
<height>287</height>
|
||||
</rect>
|
||||
</property>
|
||||
<widget class="Waveform" name="waveform">
|
||||
<property name="geometry">
|
||||
<rect>
|
||||
<x>30</x>
|
||||
<y>0</y>
|
||||
<width>361</width>
|
||||
<height>125</height>
|
||||
</rect>
|
||||
</property>
|
||||
</widget>
|
||||
</widget>
|
||||
<customwidgets>
|
||||
<customwidget>
|
||||
<class>Waveform</class>
|
||||
<extends>QWidget</extends>
|
||||
<header>waveform</header>
|
||||
</customwidget>
|
||||
</customwidgets>
|
||||
<resources/>
|
||||
<connections/>
|
||||
</ui>
|
||||
"""
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def runner():
|
||||
return CliRunner()
|
||||
|
||||
|
||||
def test_app_has_widget_commands(runner: CliRunner):
|
||||
result = runner.invoke(main._app, ["create", "--help"])
|
||||
assert "widget" in result.output
|
||||
|
||||
|
||||
def test_create_widget_takes_name(runner: CliRunner):
|
||||
result = runner.invoke(main._app, ["create", "widget"])
|
||||
assert "Missing argument 'NAME'." in result.output
|
||||
|
||||
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.create.widget.logger")
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.create.widget.make_commit")
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.create.widget.git_stage_files")
|
||||
def test_make_commit(stage: MagicMock, commit: MagicMock, logger: MagicMock):
|
||||
repo = Path("test_path")
|
||||
_commit_added_widget(repo, "test")
|
||||
assert stage.call_count == 2
|
||||
stage.assert_has_calls(
|
||||
[
|
||||
call(repo, [".copier-answers.yml"]),
|
||||
call(repo / repo.name / "bec_widgets" / "widgets" / "test", []),
|
||||
]
|
||||
)
|
||||
commit.assert_called_with(repo, "plugin-manager added new widget: test")
|
||||
logger.info.assert_called_with("Committing new widget test")
|
||||
|
||||
|
||||
def test_widget_exists_function():
|
||||
assert not _widget_exists([], "test_widget")
|
||||
assert _widget_exists([{"name": "test_widget", "use_ui": True}], "test_widget")
|
||||
|
||||
|
||||
def test_editor_cb(runner):
|
||||
result = runner.invoke(main._app, ["create", "widget", "test", "--no-use-ui", "--open-editor"])
|
||||
assert result.exit_code == 2
|
||||
assert "Invalid value" in result.output
|
||||
assert "Can only open" in result.output
|
||||
|
||||
|
||||
class TestAddWidgetVariants:
|
||||
@pytest.fixture(scope="class", autouse=True)
|
||||
def setup_env(self, tmp_path_factory: pytest.TempPathFactory):
|
||||
TestAddWidgetVariants._tmp_plugin_dir = tmp_path_factory.mktemp("test_plugin")
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def cleanup_repo(self):
|
||||
yield
|
||||
subprocess.run(["git", "reset", "--hard"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def git_repo(self):
|
||||
project = TestAddWidgetVariants._tmp_plugin_dir / "test_plugin"
|
||||
with _goto_dir(TestAddWidgetVariants._tmp_plugin_dir):
|
||||
subprocess.run(["git", "clone", PLUGIN_REPO])
|
||||
os.makedirs(project)
|
||||
with _goto_dir(project):
|
||||
subprocess.run(["git", "init", "-b", "main"])
|
||||
subprocess.run(["git", "config", "user.email", "test"])
|
||||
subprocess.run(["git", "config", "user.name", "test"])
|
||||
copier.run_copy(
|
||||
str(TestAddWidgetVariants._tmp_plugin_dir / "plugin_copier_template"),
|
||||
str(project),
|
||||
defaults=True,
|
||||
data={
|
||||
"project_name": "test_plugin",
|
||||
"widget_plugins_input": [{"name": "test_widget", "use_ui": True}],
|
||||
},
|
||||
unsafe=True,
|
||||
)
|
||||
yield project
|
||||
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.create.widget.plugin_repo_path")
|
||||
def test_add_widget_with_ui(self, plugin_repo_path, runner: CliRunner, git_repo: Path):
|
||||
plugin_repo_path.return_value = str(git_repo)
|
||||
result = runner.invoke(
|
||||
main._app, ["create", "widget", "test_widget_2", "--use-ui", "--no-open-editor"]
|
||||
)
|
||||
assert result.exit_code == 0, result.output
|
||||
widget_dir = git_repo / "test_plugin" / "bec_widgets" / "widgets" / "test_widget_2"
|
||||
assert os.path.isdir(widget_dir)
|
||||
assert os.path.isfile(widget_dir / "test_widget_2.py")
|
||||
assert os.path.isfile(widget_dir / "test_widget_2.ui")
|
||||
assert os.path.isfile(widget_dir / "test_widget_2_ui.py")
|
||||
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.create.widget.plugin_repo_path")
|
||||
def test_add_widget_without_ui(self, plugin_repo_path, runner: CliRunner, git_repo: Path):
|
||||
plugin_repo_path.return_value = str(git_repo)
|
||||
result = runner.invoke(
|
||||
main._app, ["create", "widget", "test_widget_3", "--no-use-ui", "--no-open-editor"]
|
||||
)
|
||||
assert result.exit_code == 0, result.output
|
||||
widget_dir = git_repo / "test_plugin" / "bec_widgets" / "widgets" / "test_widget_3"
|
||||
assert os.path.isdir(widget_dir)
|
||||
assert os.path.isfile(widget_dir / "test_widget_3.py")
|
||||
assert not os.path.isfile(widget_dir / "test_widget_3.ui")
|
||||
assert not os.path.isfile(widget_dir / "test_widget_3_ui.py")
|
||||
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.create.widget.logger")
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.create.widget.plugin_repo_path")
|
||||
def test_no_add_widget_dupe_name(
|
||||
self, plugin_repo_path, logger, runner: CliRunner, git_repo: Path
|
||||
):
|
||||
plugin_repo_path.return_value = str(git_repo)
|
||||
result = runner.invoke(
|
||||
main._app, ["create", "widget", "test_widget", "--no-use-ui", "--no-open-editor"]
|
||||
)
|
||||
assert result.exit_code == -1, result.output
|
||||
assert "already exists!" in logger.error.mock_calls[0].args[0]
|
||||
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.create.widget.logger")
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.create.widget.plugin_repo_path")
|
||||
def test_no_add_widget_bad_name(
|
||||
self, plugin_repo_path, logger, runner: CliRunner, git_repo: Path
|
||||
):
|
||||
plugin_repo_path.return_value = str(git_repo)
|
||||
result = runner.invoke(
|
||||
main._app, ["create", "widget", "12345", "--no-use-ui", "--no-open-editor"]
|
||||
)
|
||||
assert result.exit_code == -1, result.output
|
||||
assert "not a valid name for a widget" in logger.error.mock_calls[0].args[0]
|
||||
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.create.widget.copier")
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.create.widget.logger")
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.create.widget.plugin_repo_path")
|
||||
def test_copier_error_logged(
|
||||
self, plugin_repo_path, logger, copier, runner: CliRunner, git_repo: Path
|
||||
):
|
||||
class CopierFailure(Exception): ...
|
||||
|
||||
copier.run_update.side_effect = CopierFailure
|
||||
plugin_repo_path.return_value = str(git_repo)
|
||||
result = runner.invoke(
|
||||
main._app, ["create", "widget", "test_widget_4", "--no-use-ui", "--no-open-editor"]
|
||||
)
|
||||
assert result.exit_code == -1, result.output
|
||||
assert "CopierFailure" in logger.error.mock_calls[0].args[0]
|
||||
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.create.widget.open_and_watch_ui_editor")
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.create.widget.plugin_repo_path")
|
||||
def test_editor_opened_on_success(
|
||||
self, plugin_repo_path, open_editor, runner: CliRunner, git_repo: Path
|
||||
):
|
||||
plugin_repo_path.return_value = str(git_repo)
|
||||
runner.invoke(main._app, ["create", "widget", "TeSt_wiDgeT_5", "--use-ui", "--open-editor"])
|
||||
open_editor.assert_called_with("test_widget_5")
|
||||
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.edit_ui.logger")
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.edit_ui.open_designer")
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.edit_ui.plugin_repo_path")
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.create.widget.plugin_repo_path")
|
||||
@patch("bec_widgets.utils.bec_plugin_manager.edit_ui.plugin_package_name")
|
||||
def test_widget_editor_watcher(
|
||||
self,
|
||||
plugin_package_name,
|
||||
plugin_repo_path,
|
||||
plugin_repo_path_2,
|
||||
open_designer,
|
||||
logger: MagicMock,
|
||||
runner: CliRunner,
|
||||
git_repo: Path,
|
||||
):
|
||||
plugin_repo_path.return_value = str(git_repo)
|
||||
plugin_repo_path_2.return_value = str(git_repo)
|
||||
plugin_package_name.return_value = git_repo.name
|
||||
|
||||
widget_dir = git_repo / "test_plugin" / "bec_widgets" / "widgets" / "test_widget_6"
|
||||
widget_ui_file = widget_dir / "test_widget_6.ui"
|
||||
compiled_widget_ui_file = widget_dir / "test_widget_6_ui.py"
|
||||
|
||||
test_collector = SimpleNamespace()
|
||||
|
||||
def test_function(args: list[str]):
|
||||
test_collector.ui_file = args[0]
|
||||
with open(compiled_widget_ui_file) as f:
|
||||
test_collector.initial_compiled_ui_contents = f.read()
|
||||
with open(args[0], "w") as f:
|
||||
f.write(REPLACEMENT_UI_CONTENTS)
|
||||
start = time.monotonic()
|
||||
while call("done!") not in logger.success.call_args_list:
|
||||
time.sleep(0.05)
|
||||
if time.monotonic() - start > 5:
|
||||
raise TimeoutError("Waiting for recompilation timed out.")
|
||||
with open(compiled_widget_ui_file) as f:
|
||||
test_collector.final_compiled_ui_contents = f.read()
|
||||
|
||||
open_designer.side_effect = test_function
|
||||
result = runner.invoke(
|
||||
main._app, ["create", "widget", "test_widget_6", "--use-ui", "--open-editor"]
|
||||
)
|
||||
|
||||
assert result.exit_code == 0, result.output
|
||||
assert test_collector.ui_file == str(widget_ui_file)
|
||||
assert (
|
||||
test_collector.initial_compiled_ui_contents != test_collector.final_compiled_ui_contents
|
||||
)
|
||||
assert "" in test_collector.final_compiled_ui_contents
|
||||
@@ -287,6 +287,23 @@ def test_scan_history_view_refresh(qtbot, scan_history_view, scan_history_msg, s
|
||||
assert scan_history_view.topLevelItemCount() == 0
|
||||
|
||||
|
||||
def test_scan_history_update_full_history(
|
||||
qtbot, scan_history_view, scan_history_msg, scan_history_msg_2
|
||||
):
|
||||
"""Test the update_full_history method of ScanHistoryView."""
|
||||
# Wait spinner should be visible
|
||||
scan_history_view.update_full_history(
|
||||
[scan_history_msg.model_dump(), scan_history_msg_2.model_dump()]
|
||||
)
|
||||
assert len(scan_history_view.scan_history) == 2
|
||||
assert scan_history_view.topLevelItemCount() == 2
|
||||
assert scan_history_view.scan_history[0] == scan_history_msg_2 # new first item
|
||||
assert scan_history_view.scan_history[1] == scan_history_msg # old second item
|
||||
# Wait spinner should be hidden
|
||||
assert scan_history_view._overlay_widget.isVisible() is False
|
||||
assert scan_history_view._spinner.isVisible() is False
|
||||
|
||||
|
||||
def test_scan_history_browser(qtbot, scan_history_browser, scan_history_msg, scan_history_msg_2):
|
||||
"""Test the initialization of ScanHistoryBrowser."""
|
||||
assert isinstance(scan_history_browser.scan_history_view, ScanHistoryView)
|
||||
@@ -298,14 +315,14 @@ def test_scan_history_browser(qtbot, scan_history_browser, scan_history_msg, sca
|
||||
scan_history_browser.scan_history_view.update_history(scan_history_msg_2.model_dump())
|
||||
|
||||
assert len(scan_history_browser.scan_history_view.scan_history) == 2
|
||||
assert scan_history_browser.scan_history_view.topLevelItemCount() == 2
|
||||
# Click on first scan item history to select it
|
||||
qtbot.mouseClick(
|
||||
scan_history_browser.scan_history_view.viewport(),
|
||||
QtCore.Qt.LeftButton,
|
||||
pos=scan_history_browser.scan_history_view.visualItemRect(
|
||||
scan_history_browser.scan_history_view.topLevelItem(0)
|
||||
).center(),
|
||||
)
|
||||
# TODO #771 ; Multiple clicks to the QTreeView item fail, but only in the CI, not locally.
|
||||
# Simulate a mouse click without qtbot.mouseClick as this is unstable and currently fails in CI
|
||||
item = scan_history_browser.scan_history_view.topLevelItem(0)
|
||||
scan_history_browser.scan_history_view.setCurrentItem(item)
|
||||
scan_history_browser.scan_history_view.itemClicked.emit(item, 0)
|
||||
|
||||
assert scan_history_browser.scan_history_view.currentIndex().row() == 0
|
||||
|
||||
# Both metadata and device viewers should be updated with the first scan
|
||||
@@ -320,29 +337,6 @@ def test_scan_history_browser(qtbot, scan_history_browser, scan_history_msg, sca
|
||||
timeout=2000,
|
||||
)
|
||||
|
||||
# TODO #771 ; Multiple clicks to the QTreeView item fail, but only in the CI, not locally.
|
||||
# Click on second scan item history to select it
|
||||
# qtbot.mouseClick(
|
||||
# scan_history_browser.scan_history_view.viewport(),
|
||||
# QtCore.Qt.LeftButton,
|
||||
# pos=scan_history_browser.scan_history_view.visualItemRect(
|
||||
# scan_history_browser.scan_history_view.topLevelItem(1)
|
||||
# ).center(),
|
||||
# )
|
||||
# assert scan_history_browser.scan_history_view.currentIndex().row() == 1
|
||||
|
||||
# # Both metadata and device viewers should be updated with the first scan
|
||||
# qtbot.waitUntil(
|
||||
# lambda: scan_history_browser.scan_history_metadata_viewer.scan_history_msg
|
||||
# == scan_history_msg,
|
||||
# timeout=2000,
|
||||
# )
|
||||
# qtbot.waitUntil(
|
||||
# lambda: scan_history_browser.scan_history_device_viewer.scan_history_msg
|
||||
# == scan_history_msg,
|
||||
# timeout=2000,
|
||||
# )
|
||||
|
||||
callback_args = []
|
||||
|
||||
def plotting_callback(device_name, signal_name, msg):
|
||||
|
||||
@@ -215,9 +215,7 @@ def test_set_existing_device_and_signal(signal_label: SignalLabel, qtbot):
|
||||
signal_label.device = "samx"
|
||||
signal_label.signal = "readback"
|
||||
assert signal_label._device == "samx"
|
||||
assert signal_label._config.device == "samx"
|
||||
assert signal_label._signal == "readback"
|
||||
assert signal_label._config.default == "readback"
|
||||
|
||||
|
||||
def test_set_nonexisting_device_and_signal(signal_label: SignalLabel, qtbot):
|
||||
@@ -225,12 +223,10 @@ def test_set_nonexisting_device_and_signal(signal_label: SignalLabel, qtbot):
|
||||
signal_label.device = "samq"
|
||||
signal_label.signal = "readfront"
|
||||
assert signal_label._device == "samq"
|
||||
assert signal_label._config.device == "samq"
|
||||
signal_label._manual_read()
|
||||
signal_label.set_display_value(signal_label._value)
|
||||
assert signal_label._display.text() == "__"
|
||||
assert signal_label._signal == "readfront"
|
||||
assert signal_label._config.default == "readfront"
|
||||
signal_label._manual_read()
|
||||
signal_label.set_display_value(signal_label._value)
|
||||
assert signal_label._display.text() == "__"
|
||||
@@ -256,3 +252,12 @@ def test_handle_readback(signal_label: SignalLabel, qtbot):
|
||||
)
|
||||
assert signal_label._display.text() == "0.993 μm"
|
||||
assert signal_label._display.toolTip() == ""
|
||||
|
||||
|
||||
def test_handle_lists(signal_label: SignalLabel, qtbot):
|
||||
signal_label.custom_units = ""
|
||||
signal_label.set_display_value([1, 2, 3, 4])
|
||||
assert signal_label._display.text() == "[1, 2, 3, 4]"
|
||||
signal_label.max_list_display_len = 2
|
||||
signal_label.set_display_value([1, 2, 3, 4])
|
||||
assert signal_label._display.text() == "ARRAY DATA"
|
||||
|
||||
Reference in New Issue
Block a user