1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-07 09:17:53 +02:00

Compare commits

..

1 Commits

Author SHA1 Message Date
c5c4493997 ci: include lines with >=3 characters in report 2025-05-15 09:53:15 +02:00
23 changed files with 1603 additions and 2632 deletions

View File

@@ -1,26 +0,0 @@
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: ''
assignees: ''
---
## Bug report
## Summary
[Provide a brief description of the bug.]
## Expected Behavior vs Actual Behavior
[Describe what you expected to happen and what actually happened.]
## Steps to Reproduce
[Outline the steps that lead to the bug's occurrence. Be specific and provide a clear sequence of actions.]
## Related Issues
[Paste links to any related issues or feature requests.]

View File

@@ -1,48 +0,0 @@
---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: ''
assignees: ''
---
## Feature Summary
[Provide a brief and clear summary of the new feature you are requesting]
## Problem Description
[Explain the problem or need that this feature aims to address. Be specific about the issues or gaps in the current functionality]
## Use Case
[Describe a real-world scenario or use case where this feature would be beneficial. Explain how it would improve the user experience or workflow]
## Proposed Solution
[If you have a specific solution in mind, describe it here. Explain how it would work and how it would address the problem described above]
## Benefits
[Explain the benefits and advantages of implementing this feature. Highlight how it adds value to the product or improves user satisfaction]
## Alternatives Considered
[If you've considered alternative solutions or workarounds, mention them here. Explain why the proposed feature is the preferred option]
## Impact on Existing Functionality
[Discuss how the new feature might impact or interact with existing features. Address any potential conflicts or dependencies]
## Priority
[Assign a priority level to the feature request based on its importance. Use a scale such as Low, Medium, High]
## Attachments
[Include any relevant attachments, such as sketches, diagrams, or references that can help the development team understand your feature request better]
## Additional Information
[Provide any additional information that might be relevant to the feature request, such as user feedback, market trends, or similar features in other products]

View File

@@ -14,7 +14,7 @@ jobs:
steps:
- uses: actions/github-script@v7
id: script
if: github.event_name == 'push' && github.event.ref_type != 'tag'
if: github.event_name == 'push'
with:
script: |
const prs = await github.rest.pulls.list({

View File

@@ -55,7 +55,7 @@ jobs:
- name: Run Pytest with Coverage
id: coverage
run: pytest --random-order --cov=bec_widgets --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail tests/unit_tests/
run: pytest --random-order --cov --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail tests/unit_tests/
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5

View File

@@ -1,103 +0,0 @@
name: Continuous Delivery
on:
push:
branches:
- main
# default: least privileged permissions across all jobs
permissions:
contents: read
jobs:
release:
runs-on: ubuntu-latest
concurrency:
group: ${{ github.workflow }}-release-${{ github.ref_name }}
cancel-in-progress: false
env:
CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
permissions:
contents: write
steps:
# Note: We checkout the repository at the branch that triggered the workflow
# with the entire history to ensure to match PSR's release branch detection
# and history evaluation.
# However, we forcefully reset the branch to the workflow sha because it is
# possible that the branch was updated while the workflow was running. This
# prevents accidentally releasing un-evaluated changes.
- name: Setup | Checkout Repository on Release Branch
uses: actions/checkout@v4
with:
ref: ${{ github.ref_name }}
fetch-depth: 0
ssh-key: ${{ secrets.CI_DEPLOY_SSH_KEY }}
ssh-known-hosts: ${{ secrets.CI_DEPLOY_SSH_KNOWN_HOSTS }}
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Setup | Force release branch to be at workflow sha
run: |
git reset --hard ${{ github.sha }}
- name: Evaluate | Verify upstream has NOT changed
# Last chance to abort before causing an error as another PR/push was applied to
# the upstream branch while this workflow was running. This is important
# because we are committing a version change (--commit). You may omit this step
# if you have 'commit: false' in your configuration.
#
# You may consider moving this to a repo script and call it from this step instead
# of writing it in-line.
shell: bash
run: |
set +o pipefail
UPSTREAM_BRANCH_NAME="$(git status -sb | head -n 1 | cut -d' ' -f2 | grep -E '\.{3}' | cut -d'.' -f4)"
printf '%s\n' "Upstream branch name: $UPSTREAM_BRANCH_NAME"
set -o pipefail
if [ -z "$UPSTREAM_BRANCH_NAME" ]; then
printf >&2 '%s\n' "::error::Unable to determine upstream branch name!"
exit 1
fi
git fetch "${UPSTREAM_BRANCH_NAME%%/*}"
if ! UPSTREAM_SHA="$(git rev-parse "$UPSTREAM_BRANCH_NAME")"; then
printf >&2 '%s\n' "::error::Unable to determine upstream branch sha!"
exit 1
fi
HEAD_SHA="$(git rev-parse HEAD)"
if [ "$HEAD_SHA" != "$UPSTREAM_SHA" ]; then
printf >&2 '%s\n' "[HEAD SHA] $HEAD_SHA != $UPSTREAM_SHA [UPSTREAM SHA]"
printf >&2 '%s\n' "::error::Upstream has changed, aborting release..."
exit 1
fi
printf '%s\n' "Verified upstream branch has not changed, continuing with release..."
- name: Semantic Version Release
id: release
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
pip install python-semantic-release==9.* wheel build twine
semantic-release -vv version
if [ ! -d dist ]; then echo No release will be made; exit 0; fi
twine upload dist/* -u __token__ -p $CI_PYPI_TOKEN --skip-existing
semantic-release publish

File diff suppressed because it is too large Load Diff

View File

@@ -55,7 +55,6 @@ _Widgets = {
"TextBox": "TextBox",
"VSCodeEditor": "VSCodeEditor",
"Waveform": "Waveform",
"WebConsole": "WebConsole",
"WebsiteWidget": "WebsiteWidget",
}
@@ -3502,16 +3501,6 @@ class Waveform(RPCBase):
"""
class WebConsole(RPCBase):
"""A simple widget to display a website"""
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
"""
class WebsiteWidget(RPCBase):
"""A simple widget to display a website"""

View File

@@ -96,9 +96,9 @@ class FakePositioner(BECPositioner):
}
self._info = {
"signals": {
"readback": {"kind_str": "hinted"}, # hinted
"setpoint": {"kind_str": "normal"}, # normal
"velocity": {"kind_str": "config"}, # config
"readback": {"kind_str": "5"}, # hinted
"setpoint": {"kind_str": "1"}, # normal
"velocity": {"kind_str": "2"}, # config
}
}
self.signals = {

View File

@@ -17,23 +17,13 @@ class EntryValidator:
raise ValueError(f"Device '{name}' not found in current BEC session")
device = self.devices[name]
# Build list of available signal entries from device._info['signals']
signals_dict = getattr(device, "_info", {}).get("signals", {})
available_entries = [
sig.get("obj_name") for sig in signals_dict.values() if sig.get("obj_name")
]
# If no signals are found, means device is a signal, use the device name as the entry
if not available_entries:
available_entries = [name]
description = device.describe()
if entry is None or entry == "":
entry = next(iter(device._hints), name) if hasattr(device, "_hints") else name
if entry not in available_entries:
if entry not in description:
raise ValueError(
f"Entry '{entry}' not found in device '{name}' signals. "
f"Available signals: '{available_entries}'"
f"Entry '{entry}' not found in device '{name}' signals. Available signals: {description.keys()}"
)
return entry

View File

@@ -702,85 +702,6 @@ class ModularToolBar(QToolBar):
self.bundles[bundle_id].append(action_id)
self.update_separators()
def remove_action(self, action_id: str):
"""
Completely remove a single action from the toolbar.
The method takes care of both standalone actions and actions that are
part of an existing bundle.
Args:
action_id (str): Unique identifier for the action.
"""
if action_id not in self.widgets:
raise ValueError(f"Action with ID '{action_id}' does not exist.")
# Identify potential bundle membership
parent_bundle = None
for b_id, a_ids in self.bundles.items():
if action_id in a_ids:
parent_bundle = b_id
break
# 1. Remove the QAction from the QToolBar and delete it
tool_action = self.widgets.pop(action_id)
if hasattr(tool_action, "action") and tool_action.action is not None:
self.removeAction(tool_action.action)
tool_action.action.deleteLater()
# 2. Clean bundle bookkeeping if the action belonged to one
if parent_bundle:
self.bundles[parent_bundle].remove(action_id)
# If the bundle becomes empty, get rid of the bundle entry as well
if not self.bundles[parent_bundle]:
self.remove_bundle(parent_bundle)
# 3. Remove from the ordering list
self.toolbar_items = [
item
for item in self.toolbar_items
if not (item[0] == "action" and item[1] == action_id)
]
self.update_separators()
def remove_bundle(self, bundle_id: str):
"""
Remove an entire bundle (and all of its actions) from the toolbar.
Args:
bundle_id (str): Unique identifier for the bundle.
"""
if bundle_id not in self.bundles:
raise ValueError(f"Bundle '{bundle_id}' does not exist.")
# Remove every action belonging to this bundle
for action_id in list(self.bundles[bundle_id]): # copy the list
if action_id in self.widgets:
tool_action = self.widgets.pop(action_id)
if hasattr(tool_action, "action") and tool_action.action is not None:
self.removeAction(tool_action.action)
tool_action.action.deleteLater()
# Drop the bundle entry
self.bundles.pop(bundle_id, None)
# Remove bundle entry and its preceding separator (if any) from the ordering list
cleaned_items = []
skip_next_separator = False
for item_type, ident in self.toolbar_items:
if item_type == "bundle" and ident == bundle_id:
# mark to skip one following separator if present
skip_next_separator = True
continue
if skip_next_separator and item_type == "separator":
skip_next_separator = False
continue
cleaned_items.append((item_type, ident))
self.toolbar_items = cleaned_items
self.update_separators()
def contextMenuEvent(self, event):
"""
Overrides the context menu event to show toolbar actions with checkboxes and icons.

View File

@@ -36,16 +36,14 @@ class DeviceSignalInputBase(BECWidget):
Kind.config: "include_config_signals",
}
def __init__(
self,
client=None,
config: DeviceSignalInputBaseConfig | dict | None = None,
gui_id: str = None,
**kwargs,
):
self.config = self._process_config_input(config)
super().__init__(client=client, config=self.config, gui_id=gui_id, **kwargs)
def __init__(self, client=None, config=None, gui_id: str = None, **kwargs):
if config is None:
config = DeviceSignalInputBaseConfig(widget_class=self.__class__.__name__)
else:
if isinstance(config, dict):
config = DeviceSignalInputBaseConfig(**config)
self.config = config
super().__init__(client=client, config=config, gui_id=gui_id, **kwargs)
self._device = None
self.get_bec_shortcuts()
@@ -104,7 +102,10 @@ class DeviceSignalInputBase(BECWidget):
"""
self.config.signal_filter = self.signal_filter
# pylint: disable=protected-access
if not self.validate_device(self._device):
self._hinted_signals = []
self._normal_signals = []
self._config_signals = []
if self.validate_device(self._device) is False:
self._device = None
self.config.device = self._device
return
@@ -115,19 +116,27 @@ class DeviceSignalInputBase(BECWidget):
FilterIO.set_selection(widget=self, selection=[self._device])
return
device_info = device._info["signals"]
def _update(kind: Kind):
return [
if Kind.hinted in self.signal_filter:
hinted_signals = [
signal
for signal, signal_info in device_info.items()
if kind in self.signal_filter
and (signal_info.get("kind_str", None) == str(kind.name))
if (signal_info.get("kind_str", None) == str(Kind.hinted.value))
]
self._hinted_signals = _update(Kind.hinted)
self._normal_signals = _update(Kind.normal)
self._config_signals = _update(Kind.config)
self._hinted_signals = hinted_signals
if Kind.normal in self.signal_filter:
normal_signals = [
signal
for signal, signal_info in device_info.items()
if (signal_info.get("kind_str", None) == str(Kind.normal.value))
]
self._normal_signals = normal_signals
if Kind.config in self.signal_filter:
config_signals = [
signal
for signal, signal_info in device_info.items()
if (signal_info.get("kind_str", None) == str(Kind.config.value))
]
self._config_signals = config_signals
self._signals = self._hinted_signals + self._normal_signals + self._config_signals
FilterIO.set_selection(widget=self, selection=self.signals)
@@ -270,8 +279,3 @@ class DeviceSignalInputBase(BECWidget):
if signal in self.signals:
return True
return False
def _process_config_input(self, config: DeviceSignalInputBaseConfig | dict | None):
if config is None:
return DeviceSignalInputBaseConfig(widget_class=self.__class__.__name__)
return DeviceSignalInputBaseConfig.model_validate(config)

View File

@@ -53,7 +53,6 @@ class DictBackedTableModel(QAbstractTableModel):
if value in self._disallowed_keys or value in self._other_keys(index.row()):
return False
self._data[index.row()][index.column()] = str(value)
self.dataChanged.emit(index, index)
return True
return False
@@ -110,7 +109,6 @@ class DictBackedTableModel(QAbstractTableModel):
class DictBackedTable(QWidget):
delete_rows = Signal(list)
data_updated = Signal()
def __init__(self, initial_data: list[list[str]]):
"""Widget which uses a DictBackedTableModel to display an editable table
@@ -143,11 +141,6 @@ class DictBackedTable(QWidget):
self._add_button.clicked.connect(self._table_model.add_row)
self._remove_button.clicked.connect(self.delete_selected_rows)
self.delete_rows.connect(self._table_model.delete_rows)
self._table_model.dataChanged.connect(self._emit_data_updated)
def _emit_data_updated(self, *args, **kwargs):
"""Just to swallow the args"""
self.data_updated.emit()
def delete_selected_rows(self):
"""Delete rows which are part of the selection model"""

View File

@@ -43,7 +43,6 @@ class ScanMetadata(PydanticModelForm):
self._additional_metadata = DictBackedTable(initial_extras or [])
self._scan_name = scan_name or ""
self._md_schema = get_metadata_schema_for_scan(self._scan_name)
self._additional_metadata.data_updated.connect(self.validate_form)
super().__init__(parent=parent, metadata_model=self._md_schema, client=client, **kwargs)

View File

@@ -1,15 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.editors.web_console.web_console_plugin import WebConsolePlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(WebConsolePlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -1,230 +0,0 @@
from __future__ import annotations
import secrets
import subprocess
import time
from bec_lib.logger import bec_logger
from louie.saferef import safe_ref
from qtpy.QtCore import QUrl, qInstallMessageHandler
from qtpy.QtWebEngineWidgets import QWebEnginePage, QWebEngineView
from qtpy.QtWidgets import QApplication, QVBoxLayout, QWidget
from bec_widgets.utils.bec_widget import BECWidget
logger = bec_logger.logger
class WebConsoleRegistry:
"""
A registry for the WebConsole class to manage its instances.
"""
def __init__(self):
"""
Initialize the registry.
"""
self._instances = {}
self._server_process = None
self._server_port = None
self._token = secrets.token_hex(16)
def register(self, instance: WebConsole):
"""
Register an instance of WebConsole.
"""
self._instances[instance.gui_id] = safe_ref(instance)
self.cleanup()
if self._server_process is None:
# Start the ttyd server if not already running
self.start_ttyd()
def start_ttyd(self, use_zsh: bool | None = None):
"""
Start the ttyd server
ttyd -q -W -t 'theme={"background": "black"}' zsh
Args:
use_zsh (bool): Whether to use zsh or bash. If None, it will try to detect if zsh is available.
"""
# First, check if ttyd is installed
try:
subprocess.run(["ttyd", "--version"], check=True, stdout=subprocess.PIPE)
except FileNotFoundError:
# pylint: disable=raise-missing-from
raise RuntimeError("ttyd is not installed. Please install it first.")
if use_zsh is None:
# Check if we can use zsh
try:
subprocess.run(["zsh", "--version"], check=True, stdout=subprocess.PIPE)
use_zsh = True
except FileNotFoundError:
use_zsh = False
command = [
"ttyd",
"-p",
"0",
"-W",
"-t",
'theme={"background": "black"}',
"-c",
f"user:{self._token}",
]
if use_zsh:
command.append("zsh")
else:
command.append("bash")
# Start the ttyd server
self._server_process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
self._wait_for_server_port()
self._server_process.stdout.close()
self._server_process.stderr.close()
def _wait_for_server_port(self, timeout: float = 10):
"""
Wait for the ttyd server to start and get the port number.
Args:
timeout (float): The timeout in seconds to wait for the server to start.
"""
start_time = time.time()
while True:
output = self._server_process.stderr.readline()
if output == b"" and self._server_process.poll() is not None:
break
if not output:
continue
output = output.decode("utf-8").strip()
if "Listening on" in output:
# Extract the port number from the output
self._server_port = int(output.split(":")[-1])
logger.info(f"ttyd server started on port {self._server_port}")
break
if time.time() - start_time > timeout:
raise TimeoutError(
"Timeout waiting for ttyd server to start. Please check if ttyd is installed and available in your PATH."
)
def cleanup(self):
"""
Clean up the registry by removing any instances that are no longer valid.
"""
for gui_id, weak_ref in list(self._instances.items()):
if weak_ref() is None:
del self._instances[gui_id]
if not self._instances and self._server_process:
# If no instances are left, terminate the server process
self._server_process.terminate()
self._server_process = None
self._server_port = None
logger.info("ttyd server terminated")
def unregister(self, instance: WebConsole):
"""
Unregister an instance of WebConsole.
Args:
instance (WebConsole): The instance to unregister.
"""
if instance.gui_id in self._instances:
del self._instances[instance.gui_id]
self.cleanup()
_web_console_registry = WebConsoleRegistry()
def suppress_qt_messages(type_, context, msg):
if context.category in ["js", "default"]:
return
print(msg)
qInstallMessageHandler(suppress_qt_messages)
class BECWebEnginePage(QWebEnginePage):
def javaScriptConsoleMessage(self, level, message, lineNumber, sourceID):
logger.info(f"[JS Console] {level.name} at line {lineNumber} in {sourceID}: {message}")
class WebConsole(BECWidget, QWidget):
"""
A simple widget to display a website
"""
PLUGIN = True
ICON_NAME = "terminal"
def __init__(self, parent=None, config=None, client=None, gui_id=None, **kwargs):
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
_web_console_registry.register(self)
self._token = _web_console_registry._token
layout = QVBoxLayout()
layout.setContentsMargins(0, 0, 0, 0)
self.browser = QWebEngineView(self)
self.page = BECWebEnginePage(self)
self.page.authenticationRequired.connect(self._authenticate)
self.browser.setPage(self.page)
layout.addWidget(self.browser)
self.setLayout(layout)
self.page.setUrl(QUrl(f"http://localhost:{_web_console_registry._server_port}"))
def write(self, data: str, send_return: bool = True):
"""
Send data to the web page
"""
self.page.runJavaScript(f"window.term.paste('{data}');")
if send_return:
self.send_return()
def _authenticate(self, _, auth):
"""
Authenticate the request with the provided username and password.
"""
auth.setUser("user")
auth.setPassword(self._token)
def send_return(self):
"""
Send return to the web page
"""
self.page.runJavaScript(
"document.querySelector('textarea.xterm-helper-textarea').dispatchEvent(new KeyboardEvent('keypress', {charCode: 13}))"
)
def send_ctrl_c(self):
"""
Send Ctrl+C to the web page
"""
self.page.runJavaScript(
"document.querySelector('textarea.xterm-helper-textarea').dispatchEvent(new KeyboardEvent('keypress', {charCode: 3}))"
)
def cleanup(self):
"""
Clean up the registry by removing any instances that are no longer valid.
"""
_web_console_registry.unregister(self)
super().cleanup()
if __name__ == "__main__": # pragma: no cover
import sys
app = QApplication(sys.argv)
widget = WebConsole()
widget.show()
sys.exit(app.exec_())

View File

@@ -1 +0,0 @@
{'files': ['web_console.py']}

View File

@@ -1,54 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.editors.web_console.web_console import WebConsole
DOM_XML = """
<ui language='c++'>
<widget class='WebConsole' name='web_console'>
</widget>
</ui>
"""
class WebConsolePlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = WebConsole(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return "BEC Console"
def icon(self):
return designer_material_icon(WebConsole.ICON_NAME)
def includeFile(self):
return "web_console"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "WebConsole"
def toolTip(self):
return ""
def whatsThis(self):
return self.toolTip()

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "bec_widgets"
version = "2.4.2"
version = "2.3.0"
description = "BEC Widgets"
requires-python = ">=3.10"
classifiers = [
@@ -71,7 +71,7 @@ include_trailing_comma = true
known_first_party = ["bec_widgets"]
[tool.semantic_release]
build_command = "pip install build wheel && python -m build"
build_command = "python -m build"
version_toml = ["pyproject.toml:project.version"]
[tool.semantic_release.commit_author]
@@ -97,11 +97,11 @@ default_bump_level = 0
[tool.semantic_release.remote]
name = "origin"
type = "github"
ignore_token_for_push = true
type = "gitlab"
ignore_token_for_push = false
[tool.semantic_release.remote.token]
env = "GH_TOKEN"
env = "GL_TOKEN"
[tool.semantic_release.publish]
dist_glob_patterns = ["dist/*"]
@@ -109,7 +109,7 @@ upload_to_vcs_release = true
[tool.coverage.report]
skip_empty = true # exclude empty *files*, e.g. __init__.py, from the report
exclude_lines = [
exclude_also = [ # Exclude lines matching these regexes from the coverage report
"pragma: no cover",
"if TYPE_CHECKING:",
"return NotImplemented",

View File

@@ -96,10 +96,6 @@ def test_available_widgets(qtbot, connected_client_gui_obj):
if object_name == "VSCodeEditor":
continue
# Skip WebConsole as ttyd is not installed
if object_name == "WebConsole":
continue
#############################
######### Add widget ########
#############################

View File

@@ -7,7 +7,6 @@ from qtpy.QtWidgets import QWidget
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import (
BECDeviceFilter,
DeviceInputBase,
DeviceInputConfig,
)
from .client_mocks import mocked_client
@@ -52,13 +51,9 @@ def test_device_input_base_init_with_config(mocked_client):
"default": "samx",
}
widget = DeviceInputWidget(client=mocked_client, config=config)
widget2 = DeviceInputWidget(
client=mocked_client, config=DeviceInputConfig.model_validate(config)
)
for w in [widget, widget2]:
assert w.config.gui_id == "test_gui_id"
assert w.config.device_filter == ["Positioner"]
assert w.config.default == "samx"
assert widget.config.gui_id == "test_gui_id"
assert widget.config.device_filter == ["Positioner"]
assert widget.config.default == "samx"
def test_device_input_base_set_device_filter(device_input_base):

View File

@@ -518,156 +518,3 @@ def test_long_pressbutton(toolbar_fixture, dummy_widget, switchable_toolbar_acti
# Verify that fake_showMenu() was called.
assert call_flag, "Long press did not trigger showMenu() as expected."
# Additional tests for action/bundle removal
def test_remove_standalone_action(toolbar_fixture, icon_action, dummy_widget):
"""
Ensure that a standalone action is fully removed and no longer accessible.
"""
toolbar = toolbar_fixture
# Add the action and check it is present
toolbar.add_action("icon_action", icon_action, dummy_widget)
assert "icon_action" in toolbar.widgets
assert icon_action.action in toolbar.actions()
# Now remove it
toolbar.remove_action("icon_action")
# Action bookkeeping
assert "icon_action" not in toolbar.widgets
# QAction list
assert icon_action.action not in toolbar.actions()
# Attempting to hide / show it should raise
with pytest.raises(ValueError):
toolbar.hide_action("icon_action")
with pytest.raises(ValueError):
toolbar.show_action("icon_action")
def test_remove_action_from_bundle(
toolbar_fixture, dummy_widget, icon_action, material_icon_action
):
"""
Remove a single action that is part of a bundle and verify cleanup.
"""
toolbar = toolbar_fixture
bundle = ToolbarBundle(
bundle_id="test_bundle",
actions=[("icon_action", icon_action), ("material_action", material_icon_action)],
)
toolbar.add_bundle(bundle, dummy_widget)
# Initial assertions
assert "test_bundle" in toolbar.bundles
assert "icon_action" in toolbar.widgets
assert "material_action" in toolbar.widgets
# Remove one action from the bundle
toolbar.remove_action("icon_action")
# icon_action should be fully gone
assert "icon_action" not in toolbar.widgets
assert icon_action.action not in toolbar.actions()
# Bundle tracking should be updated
assert "icon_action" not in toolbar.bundles["test_bundle"]
# The other action must still exist
assert "material_action" in toolbar.widgets
assert material_icon_action.action in toolbar.actions()
def test_remove_last_action_from_bundle_removes_bundle(toolbar_fixture, dummy_widget, icon_action):
"""
Removing the final action from a bundle should delete the bundle entry itself.
"""
toolbar = toolbar_fixture
bundle = ToolbarBundle(bundle_id="single_action_bundle", actions=[("only_action", icon_action)])
toolbar.add_bundle(bundle, dummy_widget)
# Sanity check
assert "single_action_bundle" in toolbar.bundles
assert "only_action" in toolbar.widgets
# Remove the sole action
toolbar.remove_action("only_action")
# Bundle should be gone
assert "single_action_bundle" not in toolbar.bundles
# QAction removed
assert icon_action.action not in toolbar.actions()
def test_remove_entire_bundle(toolbar_fixture, dummy_widget, icon_action, material_icon_action):
"""
Ensure that removing a bundle deletes all its actions and separators.
"""
toolbar = toolbar_fixture
bundle = ToolbarBundle(
bundle_id="to_remove",
actions=[("icon_action", icon_action), ("material_action", material_icon_action)],
)
toolbar.add_bundle(bundle, dummy_widget)
# Confirm bundle presence
assert "to_remove" in toolbar.bundles
# Remove the whole bundle
toolbar.remove_bundle("to_remove")
# Bundle mapping gone
assert "to_remove" not in toolbar.bundles
# All actions gone
for aid, act in [("icon_action", icon_action), ("material_action", material_icon_action)]:
assert aid not in toolbar.widgets
assert act.action not in toolbar.actions()
def test_trigger_removed_action_raises(toolbar_fixture, icon_action, dummy_widget, qtbot):
"""
Add an action, connect a mock slot, then remove the action and verify that
attempting to trigger it afterwards raises RuntimeError (since the underlying
QAction has been deleted).
"""
toolbar = toolbar_fixture
# Add the action and connect a mock slot
toolbar.add_action("icon_action", icon_action, dummy_widget)
called = []
def mock_slot():
called.append(True)
icon_action.action.triggered.connect(mock_slot)
# Trigger once to confirm connection works
icon_action.action.trigger()
assert called == [True]
# Now remove the action
toolbar.remove_action("icon_action")
# Allow deleteLater event to process
qtbot.wait(50)
# The underlying C++ object should be deleted; triggering should raise
with pytest.raises(RuntimeError):
icon_action.action.trigger()
def test_remove_nonexistent_action(toolbar_fixture):
"""
Attempting to remove an action that does not exist should raise ValueError.
"""
toolbar = toolbar_fixture
with pytest.raises(ValueError) as excinfo:
toolbar.remove_action("nonexistent_action")
assert "Action with ID 'nonexistent_action' does not exist." in str(excinfo.value)
def test_remove_nonexistent_bundle(toolbar_fixture):
"""
Attempting to remove a bundle that does not exist should raise ValueError.
"""
toolbar = toolbar_fixture
with pytest.raises(ValueError) as excinfo:
toolbar.remove_bundle("nonexistent_bundle")
assert "Bundle 'nonexistent_bundle' does not exist." in str(excinfo.value)

View File

@@ -1,11 +1,9 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_lib.messages import AvailableResourceMessage, ScanQueueHistoryMessage, ScanQueueMessage
from qtpy.QtCore import QModelIndex, QPoint, Qt
from bec_widgets.utils.forms_from_types.items import StrMetadataField
from bec_widgets.utils.widget_io import WidgetIO
@@ -542,29 +540,6 @@ def test_get_scan_parameters_from_redis(scan_control, mocked_client):
assert kwargs == {"steps": 10, "relative": False, "exp_time": 2.0, "burst_at_each_point": 1}
TEST_MD = {"sample_name": "Test Sample", "test key 1": "test value 1", "test key 2": "test value 2"}
TEST_TABLE_ENTRY = [["test key 1", "test value 1"], ["test key 2", "test value 2"]]
def test_scan_metadata_is_updated_even_without_default_form_changes(
scan_control: ScanControl, qtbot
):
assert scan_control._metadata_form._scan_name == "line_scan"
scan_control.comboBox_scan_selection.setCurrentText("grid_scan")
assert scan_control._metadata_form._scan_name == "grid_scan"
scan_control._metadata_form._additional_metadata._add_button.click()
qtbot.wait(100)
table_model = scan_control._metadata_form._additional_metadata._table_model
model_key = table_model.index(0, 0, QModelIndex())
table_model.setData(model_key, "test key 1", Qt.EditRole)
model_value = model_key.siblingAtColumn(1)
table_model.setData(model_value, "test value 1", Qt.EditRole)
assert scan_control._metadata_form._additional_metadata.dump_dict() == {
"test key 1": "test value 1"
}
assert scan_control._scan_metadata == {"sample_name": "", "test key 1": "test value 1"}
def test_scan_metadata_is_connected(scan_control):
assert scan_control._metadata_form._scan_name == "line_scan"
scan_control.comboBox_scan_selection.setCurrentText("grid_scan")
@@ -573,28 +548,16 @@ def test_scan_metadata_is_connected(scan_control):
assert isinstance(sample_name, StrMetadataField)
sample_name._main_widget.setText("Test Sample")
scan_control._metadata_form._additional_metadata._table_model._data = TEST_TABLE_ENTRY
scan_control._metadata_form._additional_metadata._table_model._data = [
["test key 1", "test value 1"],
["test key 2", "test value 2"],
]
scan_control._metadata_form.validate_form()
assert scan_control._scan_metadata == TEST_MD
def test_scan_metadata_is_passed_to_scan_function(scan_control: ScanControl):
scan_control.comboBox_scan_selection.setCurrentText("grid_scan")
sample_name = scan_control._metadata_form._form_grid.layout().itemAtPosition(0, 1).widget()
sample_name._main_widget.setText("Test Sample")
scan_control._metadata_form._additional_metadata._table_model._data = TEST_TABLE_ENTRY
scan_control._metadata_form.validate_form()
assert scan_control._scan_metadata == TEST_MD
scans = SimpleNamespace(grid_scan=MagicMock())
with (
patch.object(scan_control, "scans", scans),
patch.object(scan_control, "get_scan_parameters", lambda: ((), {})),
):
scan_control.run_scan()
scans.grid_scan.assert_called_once_with(metadata=TEST_MD)
assert scan_control._scan_metadata == {
"sample_name": "Test Sample",
"test key 1": "test value 1",
"test key 2": "test value 2",
}
def test_restore_parameters_with_fewer_arg_bundles(scan_control, qtbot):

View File

@@ -1,90 +0,0 @@
from unittest import mock
import pytest
from qtpy.QtNetwork import QAuthenticator
from bec_widgets.widgets.editors.web_console.web_console import WebConsole, _web_console_registry
from .client_mocks import mocked_client
@pytest.fixture
def console_widget(qtbot, mocked_client):
with mock.patch(
"bec_widgets.widgets.editors.web_console.web_console.subprocess"
) as mock_subprocess:
with mock.patch.object(_web_console_registry, "_wait_for_server_port"):
_web_console_registry._server_port = 12345
# Create the WebConsole widget
widget = WebConsole(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_web_console_widget_initialization(console_widget):
assert (
console_widget.page.url().toString()
== f"http://localhost:{_web_console_registry._server_port}"
)
def test_web_console_write(console_widget):
# Test the write method
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
console_widget.write("Hello, World!")
assert mock.call("window.term.paste('Hello, World!');") in mock_run_js.mock_calls
def test_web_console_write_no_return(console_widget):
# Test the write method with send_return=False
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
console_widget.write("Hello, World!", send_return=False)
assert mock.call("window.term.paste('Hello, World!');") in mock_run_js.mock_calls
assert mock_run_js.call_count == 1
def test_web_console_send_return(console_widget):
# Test the send_return method
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
console_widget.send_return()
script = mock_run_js.call_args[0][0]
assert "new KeyboardEvent('keypress', {charCode: 13})" in script
assert mock_run_js.call_count == 1
def test_web_console_send_ctrl_c(console_widget):
# Test the send_ctrl_c method
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
console_widget.send_ctrl_c()
script = mock_run_js.call_args[0][0]
assert "new KeyboardEvent('keypress', {charCode: 3})" in script
assert mock_run_js.call_count == 1
def test_web_console_authenticate(console_widget):
# Test the _authenticate method
token = _web_console_registry._token
mock_auth = mock.MagicMock(spec=QAuthenticator)
console_widget._authenticate(None, mock_auth)
mock_auth.setUser.assert_called_once_with("user")
mock_auth.setPassword.assert_called_once_with(token)
def test_web_console_registry_wait_for_server_port():
# Test the _wait_for_server_port method
with mock.patch.object(_web_console_registry, "_server_process") as mock_subprocess:
mock_subprocess.stderr.readline.side_effect = [b"Starting", b"Listening on port: 12345"]
_web_console_registry._wait_for_server_port()
assert _web_console_registry._server_port == 12345
def test_web_console_registry_wait_for_server_port_timeout():
# Test the _wait_for_server_port method with timeout
with mock.patch.object(_web_console_registry, "_server_process") as mock_subprocess:
with pytest.raises(TimeoutError):
_web_console_registry._wait_for_server_port(timeout=0.1)