1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-11 19:20:53 +02:00

Compare commits

...

24 Commits

Author SHA1 Message Date
semantic-release
40b5081632 2.4.3
Automatically generated by python-semantic-release
2025-05-19 15:25:35 +00:00
f064baae68 fix: twine upload key 2025-05-19 17:24:55 +02:00
semantic-release
58f01fb3a2 2.4.2
Automatically generated by python-semantic-release
2025-05-19 15:04:48 +00:00
1e344eacb7 fix: push release using GH_token 2025-05-19 17:04:04 +02:00
semantic-release
34002fa51a 2.4.1
Automatically generated by python-semantic-release
2025-05-19 14:34:58 +00:00
a00d510a75 fix: skip actions on new tags 2025-05-19 16:34:19 +02:00
semantic-release
120faf9523 2.4.0
Automatically generated by python-semantic-release
2025-05-19 13:53:08 +00:00
d7bd61f69e ci: use custom semver action 2025-05-19 15:50:24 +02:00
94bcfff724 ci: add known hosts 2025-05-19 15:10:38 +02:00
a17e7a0d52 ci: add deploy ssh key to release job 2025-05-19 15:02:54 +02:00
7f67d28887 ci: use ssh key for push 2025-05-19 14:39:01 +02:00
52d8e4b332 ci: build with ssh key 2025-05-19 14:26:17 +02:00
dea2b44e6a ci: fix job permissions for release 2025-05-19 13:47:25 +02:00
dc70ea6dfb ci: fix missing build dependencies 2025-05-19 13:32:16 +02:00
133ddda3e3 ci: fix missing build dependencies 2025-05-19 13:16:29 +02:00
8eee92e5cf ci: add semantic-release job 2025-05-19 12:57:17 +02:00
Klaus Wakonig
85de24aa89 chore: update issue templates 2025-05-17 20:38:32 +02:00
56b6a0b8c2 feat: add web console 2025-05-17 13:34:21 +02:00
d579d894f0 feat(modular_toolbar): remove action/bundle by id 2025-05-17 09:55:00 +02:00
d915d2f507 fix: (#612) fix additional MD form
makes sure the form is validated on any changes of the additional
metadata table model so that they are propagated to the scan control
widget even when nothing is entered in the standard form
2025-05-16 14:37:07 +02:00
7d7a88669f fix: (#572) signal input base filter
use name attribute rather than value from Kind, to compare with kind_str
2025-05-16 10:50:27 +02:00
a42dcec6d4 fix(entry_validator): device signals retrieved from ._info instead of .describe(), close #570 2025-05-15 15:33:00 +02:00
8cf1f09926 ci: exclude test dir from coverage report 2025-05-15 11:35:45 +02:00
83b153a14a ci: include lines with >=3 characters in report 2025-05-15 09:52:37 +02:00
23 changed files with 2641 additions and 1605 deletions

26
.github/ISSUE_TEMPLATE/bug_report.md vendored Normal file
View File

@@ -0,0 +1,26 @@
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: ''
assignees: ''
---
## Bug report
## Summary
[Provide a brief description of the bug.]
## Expected Behavior vs Actual Behavior
[Describe what you expected to happen and what actually happened.]
## Steps to Reproduce
[Outline the steps that lead to the bug's occurrence. Be specific and provide a clear sequence of actions.]
## Related Issues
[Paste links to any related issues or feature requests.]

View File

@@ -0,0 +1,48 @@
---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: ''
assignees: ''
---
## Feature Summary
[Provide a brief and clear summary of the new feature you are requesting]
## Problem Description
[Explain the problem or need that this feature aims to address. Be specific about the issues or gaps in the current functionality]
## Use Case
[Describe a real-world scenario or use case where this feature would be beneficial. Explain how it would improve the user experience or workflow]
## Proposed Solution
[If you have a specific solution in mind, describe it here. Explain how it would work and how it would address the problem described above]
## Benefits
[Explain the benefits and advantages of implementing this feature. Highlight how it adds value to the product or improves user satisfaction]
## Alternatives Considered
[If you've considered alternative solutions or workarounds, mention them here. Explain why the proposed feature is the preferred option]
## Impact on Existing Functionality
[Discuss how the new feature might impact or interact with existing features. Address any potential conflicts or dependencies]
## Priority
[Assign a priority level to the feature request based on its importance. Use a scale such as Low, Medium, High]
## Attachments
[Include any relevant attachments, such as sketches, diagrams, or references that can help the development team understand your feature request better]
## Additional Information
[Provide any additional information that might be relevant to the feature request, such as user feedback, market trends, or similar features in other products]

View File

@@ -14,7 +14,7 @@ jobs:
steps:
- uses: actions/github-script@v7
id: script
if: github.event_name == 'push'
if: github.event_name == 'push' && github.event.ref_type != 'tag'
with:
script: |
const prs = await github.rest.pulls.list({

View File

@@ -55,9 +55,7 @@ jobs:
- name: Run Pytest with Coverage
id: coverage
run: |
pip install coverage pytest pytest-random-order pytest-cov
pytest --random-order --cov --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail tests/unit_tests/
run: pytest --random-order --cov=bec_widgets --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail tests/unit_tests/
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5

103
.github/workflows/semantic_release.yml vendored Normal file
View File

@@ -0,0 +1,103 @@
name: Continuous Delivery
on:
push:
branches:
- main
# default: least privileged permissions across all jobs
permissions:
contents: read
jobs:
release:
runs-on: ubuntu-latest
concurrency:
group: ${{ github.workflow }}-release-${{ github.ref_name }}
cancel-in-progress: false
env:
CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
permissions:
contents: write
steps:
# Note: We checkout the repository at the branch that triggered the workflow
# with the entire history to ensure to match PSR's release branch detection
# and history evaluation.
# However, we forcefully reset the branch to the workflow sha because it is
# possible that the branch was updated while the workflow was running. This
# prevents accidentally releasing un-evaluated changes.
- name: Setup | Checkout Repository on Release Branch
uses: actions/checkout@v4
with:
ref: ${{ github.ref_name }}
fetch-depth: 0
ssh-key: ${{ secrets.CI_DEPLOY_SSH_KEY }}
ssh-known-hosts: ${{ secrets.CI_DEPLOY_SSH_KNOWN_HOSTS }}
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Setup | Force release branch to be at workflow sha
run: |
git reset --hard ${{ github.sha }}
- name: Evaluate | Verify upstream has NOT changed
# Last chance to abort before causing an error as another PR/push was applied to
# the upstream branch while this workflow was running. This is important
# because we are committing a version change (--commit). You may omit this step
# if you have 'commit: false' in your configuration.
#
# You may consider moving this to a repo script and call it from this step instead
# of writing it in-line.
shell: bash
run: |
set +o pipefail
UPSTREAM_BRANCH_NAME="$(git status -sb | head -n 1 | cut -d' ' -f2 | grep -E '\.{3}' | cut -d'.' -f4)"
printf '%s\n' "Upstream branch name: $UPSTREAM_BRANCH_NAME"
set -o pipefail
if [ -z "$UPSTREAM_BRANCH_NAME" ]; then
printf >&2 '%s\n' "::error::Unable to determine upstream branch name!"
exit 1
fi
git fetch "${UPSTREAM_BRANCH_NAME%%/*}"
if ! UPSTREAM_SHA="$(git rev-parse "$UPSTREAM_BRANCH_NAME")"; then
printf >&2 '%s\n' "::error::Unable to determine upstream branch sha!"
exit 1
fi
HEAD_SHA="$(git rev-parse HEAD)"
if [ "$HEAD_SHA" != "$UPSTREAM_SHA" ]; then
printf >&2 '%s\n' "[HEAD SHA] $HEAD_SHA != $UPSTREAM_SHA [UPSTREAM SHA]"
printf >&2 '%s\n' "::error::Upstream has changed, aborting release..."
exit 1
fi
printf '%s\n' "Verified upstream branch has not changed, continuing with release..."
- name: Semantic Version Release
id: release
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
pip install python-semantic-release==9.* wheel build twine
semantic-release -vv version
if [ ! -d dist ]; then echo No release will be made; exit 0; fi
twine upload dist/* -u __token__ -p ${{ secrets.CI_PYPI_TOKEN }} --skip-existing
semantic-release publish

File diff suppressed because it is too large Load Diff

View File

@@ -55,6 +55,7 @@ _Widgets = {
"TextBox": "TextBox",
"VSCodeEditor": "VSCodeEditor",
"Waveform": "Waveform",
"WebConsole": "WebConsole",
"WebsiteWidget": "WebsiteWidget",
}
@@ -3501,6 +3502,16 @@ class Waveform(RPCBase):
"""
class WebConsole(RPCBase):
"""A simple widget to display a website"""
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
"""
class WebsiteWidget(RPCBase):
"""A simple widget to display a website"""

View File

@@ -96,9 +96,9 @@ class FakePositioner(BECPositioner):
}
self._info = {
"signals": {
"readback": {"kind_str": "5"}, # hinted
"setpoint": {"kind_str": "1"}, # normal
"velocity": {"kind_str": "2"}, # config
"readback": {"kind_str": "hinted"}, # hinted
"setpoint": {"kind_str": "normal"}, # normal
"velocity": {"kind_str": "config"}, # config
}
}
self.signals = {

View File

@@ -17,13 +17,23 @@ class EntryValidator:
raise ValueError(f"Device '{name}' not found in current BEC session")
device = self.devices[name]
description = device.describe()
# Build list of available signal entries from device._info['signals']
signals_dict = getattr(device, "_info", {}).get("signals", {})
available_entries = [
sig.get("obj_name") for sig in signals_dict.values() if sig.get("obj_name")
]
# If no signals are found, means device is a signal, use the device name as the entry
if not available_entries:
available_entries = [name]
if entry is None or entry == "":
entry = next(iter(device._hints), name) if hasattr(device, "_hints") else name
if entry not in description:
if entry not in available_entries:
raise ValueError(
f"Entry '{entry}' not found in device '{name}' signals. Available signals: {description.keys()}"
f"Entry '{entry}' not found in device '{name}' signals. "
f"Available signals: '{available_entries}'"
)
return entry

View File

@@ -702,6 +702,85 @@ class ModularToolBar(QToolBar):
self.bundles[bundle_id].append(action_id)
self.update_separators()
def remove_action(self, action_id: str):
"""
Completely remove a single action from the toolbar.
The method takes care of both standalone actions and actions that are
part of an existing bundle.
Args:
action_id (str): Unique identifier for the action.
"""
if action_id not in self.widgets:
raise ValueError(f"Action with ID '{action_id}' does not exist.")
# Identify potential bundle membership
parent_bundle = None
for b_id, a_ids in self.bundles.items():
if action_id in a_ids:
parent_bundle = b_id
break
# 1. Remove the QAction from the QToolBar and delete it
tool_action = self.widgets.pop(action_id)
if hasattr(tool_action, "action") and tool_action.action is not None:
self.removeAction(tool_action.action)
tool_action.action.deleteLater()
# 2. Clean bundle bookkeeping if the action belonged to one
if parent_bundle:
self.bundles[parent_bundle].remove(action_id)
# If the bundle becomes empty, get rid of the bundle entry as well
if not self.bundles[parent_bundle]:
self.remove_bundle(parent_bundle)
# 3. Remove from the ordering list
self.toolbar_items = [
item
for item in self.toolbar_items
if not (item[0] == "action" and item[1] == action_id)
]
self.update_separators()
def remove_bundle(self, bundle_id: str):
"""
Remove an entire bundle (and all of its actions) from the toolbar.
Args:
bundle_id (str): Unique identifier for the bundle.
"""
if bundle_id not in self.bundles:
raise ValueError(f"Bundle '{bundle_id}' does not exist.")
# Remove every action belonging to this bundle
for action_id in list(self.bundles[bundle_id]): # copy the list
if action_id in self.widgets:
tool_action = self.widgets.pop(action_id)
if hasattr(tool_action, "action") and tool_action.action is not None:
self.removeAction(tool_action.action)
tool_action.action.deleteLater()
# Drop the bundle entry
self.bundles.pop(bundle_id, None)
# Remove bundle entry and its preceding separator (if any) from the ordering list
cleaned_items = []
skip_next_separator = False
for item_type, ident in self.toolbar_items:
if item_type == "bundle" and ident == bundle_id:
# mark to skip one following separator if present
skip_next_separator = True
continue
if skip_next_separator and item_type == "separator":
skip_next_separator = False
continue
cleaned_items.append((item_type, ident))
self.toolbar_items = cleaned_items
self.update_separators()
def contextMenuEvent(self, event):
"""
Overrides the context menu event to show toolbar actions with checkboxes and icons.

View File

@@ -36,14 +36,16 @@ class DeviceSignalInputBase(BECWidget):
Kind.config: "include_config_signals",
}
def __init__(self, client=None, config=None, gui_id: str = None, **kwargs):
if config is None:
config = DeviceSignalInputBaseConfig(widget_class=self.__class__.__name__)
else:
if isinstance(config, dict):
config = DeviceSignalInputBaseConfig(**config)
self.config = config
super().__init__(client=client, config=config, gui_id=gui_id, **kwargs)
def __init__(
self,
client=None,
config: DeviceSignalInputBaseConfig | dict | None = None,
gui_id: str = None,
**kwargs,
):
self.config = self._process_config_input(config)
super().__init__(client=client, config=self.config, gui_id=gui_id, **kwargs)
self._device = None
self.get_bec_shortcuts()
@@ -102,10 +104,7 @@ class DeviceSignalInputBase(BECWidget):
"""
self.config.signal_filter = self.signal_filter
# pylint: disable=protected-access
self._hinted_signals = []
self._normal_signals = []
self._config_signals = []
if self.validate_device(self._device) is False:
if not self.validate_device(self._device):
self._device = None
self.config.device = self._device
return
@@ -116,27 +115,19 @@ class DeviceSignalInputBase(BECWidget):
FilterIO.set_selection(widget=self, selection=[self._device])
return
device_info = device._info["signals"]
if Kind.hinted in self.signal_filter:
hinted_signals = [
def _update(kind: Kind):
return [
signal
for signal, signal_info in device_info.items()
if (signal_info.get("kind_str", None) == str(Kind.hinted.value))
if kind in self.signal_filter
and (signal_info.get("kind_str", None) == str(kind.name))
]
self._hinted_signals = hinted_signals
if Kind.normal in self.signal_filter:
normal_signals = [
signal
for signal, signal_info in device_info.items()
if (signal_info.get("kind_str", None) == str(Kind.normal.value))
]
self._normal_signals = normal_signals
if Kind.config in self.signal_filter:
config_signals = [
signal
for signal, signal_info in device_info.items()
if (signal_info.get("kind_str", None) == str(Kind.config.value))
]
self._config_signals = config_signals
self._hinted_signals = _update(Kind.hinted)
self._normal_signals = _update(Kind.normal)
self._config_signals = _update(Kind.config)
self._signals = self._hinted_signals + self._normal_signals + self._config_signals
FilterIO.set_selection(widget=self, selection=self.signals)
@@ -279,3 +270,8 @@ class DeviceSignalInputBase(BECWidget):
if signal in self.signals:
return True
return False
def _process_config_input(self, config: DeviceSignalInputBaseConfig | dict | None):
if config is None:
return DeviceSignalInputBaseConfig(widget_class=self.__class__.__name__)
return DeviceSignalInputBaseConfig.model_validate(config)

View File

@@ -53,6 +53,7 @@ class DictBackedTableModel(QAbstractTableModel):
if value in self._disallowed_keys or value in self._other_keys(index.row()):
return False
self._data[index.row()][index.column()] = str(value)
self.dataChanged.emit(index, index)
return True
return False
@@ -109,6 +110,7 @@ class DictBackedTableModel(QAbstractTableModel):
class DictBackedTable(QWidget):
delete_rows = Signal(list)
data_updated = Signal()
def __init__(self, initial_data: list[list[str]]):
"""Widget which uses a DictBackedTableModel to display an editable table
@@ -141,6 +143,11 @@ class DictBackedTable(QWidget):
self._add_button.clicked.connect(self._table_model.add_row)
self._remove_button.clicked.connect(self.delete_selected_rows)
self.delete_rows.connect(self._table_model.delete_rows)
self._table_model.dataChanged.connect(self._emit_data_updated)
def _emit_data_updated(self, *args, **kwargs):
"""Just to swallow the args"""
self.data_updated.emit()
def delete_selected_rows(self):
"""Delete rows which are part of the selection model"""

View File

@@ -43,6 +43,7 @@ class ScanMetadata(PydanticModelForm):
self._additional_metadata = DictBackedTable(initial_extras or [])
self._scan_name = scan_name or ""
self._md_schema = get_metadata_schema_for_scan(self._scan_name)
self._additional_metadata.data_updated.connect(self.validate_form)
super().__init__(parent=parent, metadata_model=self._md_schema, client=client, **kwargs)

View File

@@ -0,0 +1,15 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.editors.web_console.web_console_plugin import WebConsolePlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(WebConsolePlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -0,0 +1,230 @@
from __future__ import annotations
import secrets
import subprocess
import time
from bec_lib.logger import bec_logger
from louie.saferef import safe_ref
from qtpy.QtCore import QUrl, qInstallMessageHandler
from qtpy.QtWebEngineWidgets import QWebEnginePage, QWebEngineView
from qtpy.QtWidgets import QApplication, QVBoxLayout, QWidget
from bec_widgets.utils.bec_widget import BECWidget
logger = bec_logger.logger
class WebConsoleRegistry:
"""
A registry for the WebConsole class to manage its instances.
"""
def __init__(self):
"""
Initialize the registry.
"""
self._instances = {}
self._server_process = None
self._server_port = None
self._token = secrets.token_hex(16)
def register(self, instance: WebConsole):
"""
Register an instance of WebConsole.
"""
self._instances[instance.gui_id] = safe_ref(instance)
self.cleanup()
if self._server_process is None:
# Start the ttyd server if not already running
self.start_ttyd()
def start_ttyd(self, use_zsh: bool | None = None):
"""
Start the ttyd server
ttyd -q -W -t 'theme={"background": "black"}' zsh
Args:
use_zsh (bool): Whether to use zsh or bash. If None, it will try to detect if zsh is available.
"""
# First, check if ttyd is installed
try:
subprocess.run(["ttyd", "--version"], check=True, stdout=subprocess.PIPE)
except FileNotFoundError:
# pylint: disable=raise-missing-from
raise RuntimeError("ttyd is not installed. Please install it first.")
if use_zsh is None:
# Check if we can use zsh
try:
subprocess.run(["zsh", "--version"], check=True, stdout=subprocess.PIPE)
use_zsh = True
except FileNotFoundError:
use_zsh = False
command = [
"ttyd",
"-p",
"0",
"-W",
"-t",
'theme={"background": "black"}',
"-c",
f"user:{self._token}",
]
if use_zsh:
command.append("zsh")
else:
command.append("bash")
# Start the ttyd server
self._server_process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
self._wait_for_server_port()
self._server_process.stdout.close()
self._server_process.stderr.close()
def _wait_for_server_port(self, timeout: float = 10):
"""
Wait for the ttyd server to start and get the port number.
Args:
timeout (float): The timeout in seconds to wait for the server to start.
"""
start_time = time.time()
while True:
output = self._server_process.stderr.readline()
if output == b"" and self._server_process.poll() is not None:
break
if not output:
continue
output = output.decode("utf-8").strip()
if "Listening on" in output:
# Extract the port number from the output
self._server_port = int(output.split(":")[-1])
logger.info(f"ttyd server started on port {self._server_port}")
break
if time.time() - start_time > timeout:
raise TimeoutError(
"Timeout waiting for ttyd server to start. Please check if ttyd is installed and available in your PATH."
)
def cleanup(self):
"""
Clean up the registry by removing any instances that are no longer valid.
"""
for gui_id, weak_ref in list(self._instances.items()):
if weak_ref() is None:
del self._instances[gui_id]
if not self._instances and self._server_process:
# If no instances are left, terminate the server process
self._server_process.terminate()
self._server_process = None
self._server_port = None
logger.info("ttyd server terminated")
def unregister(self, instance: WebConsole):
"""
Unregister an instance of WebConsole.
Args:
instance (WebConsole): The instance to unregister.
"""
if instance.gui_id in self._instances:
del self._instances[instance.gui_id]
self.cleanup()
_web_console_registry = WebConsoleRegistry()
def suppress_qt_messages(type_, context, msg):
if context.category in ["js", "default"]:
return
print(msg)
qInstallMessageHandler(suppress_qt_messages)
class BECWebEnginePage(QWebEnginePage):
def javaScriptConsoleMessage(self, level, message, lineNumber, sourceID):
logger.info(f"[JS Console] {level.name} at line {lineNumber} in {sourceID}: {message}")
class WebConsole(BECWidget, QWidget):
"""
A simple widget to display a website
"""
PLUGIN = True
ICON_NAME = "terminal"
def __init__(self, parent=None, config=None, client=None, gui_id=None, **kwargs):
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
_web_console_registry.register(self)
self._token = _web_console_registry._token
layout = QVBoxLayout()
layout.setContentsMargins(0, 0, 0, 0)
self.browser = QWebEngineView(self)
self.page = BECWebEnginePage(self)
self.page.authenticationRequired.connect(self._authenticate)
self.browser.setPage(self.page)
layout.addWidget(self.browser)
self.setLayout(layout)
self.page.setUrl(QUrl(f"http://localhost:{_web_console_registry._server_port}"))
def write(self, data: str, send_return: bool = True):
"""
Send data to the web page
"""
self.page.runJavaScript(f"window.term.paste('{data}');")
if send_return:
self.send_return()
def _authenticate(self, _, auth):
"""
Authenticate the request with the provided username and password.
"""
auth.setUser("user")
auth.setPassword(self._token)
def send_return(self):
"""
Send return to the web page
"""
self.page.runJavaScript(
"document.querySelector('textarea.xterm-helper-textarea').dispatchEvent(new KeyboardEvent('keypress', {charCode: 13}))"
)
def send_ctrl_c(self):
"""
Send Ctrl+C to the web page
"""
self.page.runJavaScript(
"document.querySelector('textarea.xterm-helper-textarea').dispatchEvent(new KeyboardEvent('keypress', {charCode: 3}))"
)
def cleanup(self):
"""
Clean up the registry by removing any instances that are no longer valid.
"""
_web_console_registry.unregister(self)
super().cleanup()
if __name__ == "__main__": # pragma: no cover
import sys
app = QApplication(sys.argv)
widget = WebConsole()
widget.show()
sys.exit(app.exec_())

View File

@@ -0,0 +1 @@
{'files': ['web_console.py']}

View File

@@ -0,0 +1,54 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.editors.web_console.web_console import WebConsole
DOM_XML = """
<ui language='c++'>
<widget class='WebConsole' name='web_console'>
</widget>
</ui>
"""
class WebConsolePlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = WebConsole(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return "BEC Console"
def icon(self):
return designer_material_icon(WebConsole.ICON_NAME)
def includeFile(self):
return "web_console"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "WebConsole"
def toolTip(self):
return ""
def whatsThis(self):
return self.toolTip()

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "bec_widgets"
version = "2.3.0"
version = "2.4.3"
description = "BEC Widgets"
requires-python = ">=3.10"
classifiers = [
@@ -38,6 +38,7 @@ dev = [
"pytest-timeout~=2.2",
"pytest-xvfb~=3.0",
"pytest~=8.0",
"pytest-cov~=6.1.1",
]
[project.urls]
@@ -70,7 +71,7 @@ include_trailing_comma = true
known_first_party = ["bec_widgets"]
[tool.semantic_release]
build_command = "python -m build"
build_command = "pip install build wheel && python -m build"
version_toml = ["pyproject.toml:project.version"]
[tool.semantic_release.commit_author]
@@ -96,11 +97,11 @@ default_bump_level = 0
[tool.semantic_release.remote]
name = "origin"
type = "gitlab"
ignore_token_for_push = false
type = "github"
ignore_token_for_push = true
[tool.semantic_release.remote.token]
env = "GL_TOKEN"
env = "GH_TOKEN"
[tool.semantic_release.publish]
dist_glob_patterns = ["dist/*"]
@@ -113,6 +114,6 @@ exclude_lines = [
"if TYPE_CHECKING:",
"return NotImplemented",
"raise NotImplementedError",
"...",
"\\.\\.\\.",
'if __name__ == "__main__":',
]

View File

@@ -96,6 +96,10 @@ def test_available_widgets(qtbot, connected_client_gui_obj):
if object_name == "VSCodeEditor":
continue
# Skip WebConsole as ttyd is not installed
if object_name == "WebConsole":
continue
#############################
######### Add widget ########
#############################

View File

@@ -7,6 +7,7 @@ from qtpy.QtWidgets import QWidget
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import (
BECDeviceFilter,
DeviceInputBase,
DeviceInputConfig,
)
from .client_mocks import mocked_client
@@ -51,9 +52,13 @@ def test_device_input_base_init_with_config(mocked_client):
"default": "samx",
}
widget = DeviceInputWidget(client=mocked_client, config=config)
assert widget.config.gui_id == "test_gui_id"
assert widget.config.device_filter == ["Positioner"]
assert widget.config.default == "samx"
widget2 = DeviceInputWidget(
client=mocked_client, config=DeviceInputConfig.model_validate(config)
)
for w in [widget, widget2]:
assert w.config.gui_id == "test_gui_id"
assert w.config.device_filter == ["Positioner"]
assert w.config.default == "samx"
def test_device_input_base_set_device_filter(device_input_base):

View File

@@ -518,3 +518,156 @@ def test_long_pressbutton(toolbar_fixture, dummy_widget, switchable_toolbar_acti
# Verify that fake_showMenu() was called.
assert call_flag, "Long press did not trigger showMenu() as expected."
# Additional tests for action/bundle removal
def test_remove_standalone_action(toolbar_fixture, icon_action, dummy_widget):
"""
Ensure that a standalone action is fully removed and no longer accessible.
"""
toolbar = toolbar_fixture
# Add the action and check it is present
toolbar.add_action("icon_action", icon_action, dummy_widget)
assert "icon_action" in toolbar.widgets
assert icon_action.action in toolbar.actions()
# Now remove it
toolbar.remove_action("icon_action")
# Action bookkeeping
assert "icon_action" not in toolbar.widgets
# QAction list
assert icon_action.action not in toolbar.actions()
# Attempting to hide / show it should raise
with pytest.raises(ValueError):
toolbar.hide_action("icon_action")
with pytest.raises(ValueError):
toolbar.show_action("icon_action")
def test_remove_action_from_bundle(
toolbar_fixture, dummy_widget, icon_action, material_icon_action
):
"""
Remove a single action that is part of a bundle and verify cleanup.
"""
toolbar = toolbar_fixture
bundle = ToolbarBundle(
bundle_id="test_bundle",
actions=[("icon_action", icon_action), ("material_action", material_icon_action)],
)
toolbar.add_bundle(bundle, dummy_widget)
# Initial assertions
assert "test_bundle" in toolbar.bundles
assert "icon_action" in toolbar.widgets
assert "material_action" in toolbar.widgets
# Remove one action from the bundle
toolbar.remove_action("icon_action")
# icon_action should be fully gone
assert "icon_action" not in toolbar.widgets
assert icon_action.action not in toolbar.actions()
# Bundle tracking should be updated
assert "icon_action" not in toolbar.bundles["test_bundle"]
# The other action must still exist
assert "material_action" in toolbar.widgets
assert material_icon_action.action in toolbar.actions()
def test_remove_last_action_from_bundle_removes_bundle(toolbar_fixture, dummy_widget, icon_action):
"""
Removing the final action from a bundle should delete the bundle entry itself.
"""
toolbar = toolbar_fixture
bundle = ToolbarBundle(bundle_id="single_action_bundle", actions=[("only_action", icon_action)])
toolbar.add_bundle(bundle, dummy_widget)
# Sanity check
assert "single_action_bundle" in toolbar.bundles
assert "only_action" in toolbar.widgets
# Remove the sole action
toolbar.remove_action("only_action")
# Bundle should be gone
assert "single_action_bundle" not in toolbar.bundles
# QAction removed
assert icon_action.action not in toolbar.actions()
def test_remove_entire_bundle(toolbar_fixture, dummy_widget, icon_action, material_icon_action):
"""
Ensure that removing a bundle deletes all its actions and separators.
"""
toolbar = toolbar_fixture
bundle = ToolbarBundle(
bundle_id="to_remove",
actions=[("icon_action", icon_action), ("material_action", material_icon_action)],
)
toolbar.add_bundle(bundle, dummy_widget)
# Confirm bundle presence
assert "to_remove" in toolbar.bundles
# Remove the whole bundle
toolbar.remove_bundle("to_remove")
# Bundle mapping gone
assert "to_remove" not in toolbar.bundles
# All actions gone
for aid, act in [("icon_action", icon_action), ("material_action", material_icon_action)]:
assert aid not in toolbar.widgets
assert act.action not in toolbar.actions()
def test_trigger_removed_action_raises(toolbar_fixture, icon_action, dummy_widget, qtbot):
"""
Add an action, connect a mock slot, then remove the action and verify that
attempting to trigger it afterwards raises RuntimeError (since the underlying
QAction has been deleted).
"""
toolbar = toolbar_fixture
# Add the action and connect a mock slot
toolbar.add_action("icon_action", icon_action, dummy_widget)
called = []
def mock_slot():
called.append(True)
icon_action.action.triggered.connect(mock_slot)
# Trigger once to confirm connection works
icon_action.action.trigger()
assert called == [True]
# Now remove the action
toolbar.remove_action("icon_action")
# Allow deleteLater event to process
qtbot.wait(50)
# The underlying C++ object should be deleted; triggering should raise
with pytest.raises(RuntimeError):
icon_action.action.trigger()
def test_remove_nonexistent_action(toolbar_fixture):
"""
Attempting to remove an action that does not exist should raise ValueError.
"""
toolbar = toolbar_fixture
with pytest.raises(ValueError) as excinfo:
toolbar.remove_action("nonexistent_action")
assert "Action with ID 'nonexistent_action' does not exist." in str(excinfo.value)
def test_remove_nonexistent_bundle(toolbar_fixture):
"""
Attempting to remove a bundle that does not exist should raise ValueError.
"""
toolbar = toolbar_fixture
with pytest.raises(ValueError) as excinfo:
toolbar.remove_bundle("nonexistent_bundle")
assert "Bundle 'nonexistent_bundle' does not exist." in str(excinfo.value)

View File

@@ -1,9 +1,11 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
from unittest.mock import MagicMock
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_lib.messages import AvailableResourceMessage, ScanQueueHistoryMessage, ScanQueueMessage
from qtpy.QtCore import QModelIndex, QPoint, Qt
from bec_widgets.utils.forms_from_types.items import StrMetadataField
from bec_widgets.utils.widget_io import WidgetIO
@@ -540,6 +542,29 @@ def test_get_scan_parameters_from_redis(scan_control, mocked_client):
assert kwargs == {"steps": 10, "relative": False, "exp_time": 2.0, "burst_at_each_point": 1}
TEST_MD = {"sample_name": "Test Sample", "test key 1": "test value 1", "test key 2": "test value 2"}
TEST_TABLE_ENTRY = [["test key 1", "test value 1"], ["test key 2", "test value 2"]]
def test_scan_metadata_is_updated_even_without_default_form_changes(
scan_control: ScanControl, qtbot
):
assert scan_control._metadata_form._scan_name == "line_scan"
scan_control.comboBox_scan_selection.setCurrentText("grid_scan")
assert scan_control._metadata_form._scan_name == "grid_scan"
scan_control._metadata_form._additional_metadata._add_button.click()
qtbot.wait(100)
table_model = scan_control._metadata_form._additional_metadata._table_model
model_key = table_model.index(0, 0, QModelIndex())
table_model.setData(model_key, "test key 1", Qt.EditRole)
model_value = model_key.siblingAtColumn(1)
table_model.setData(model_value, "test value 1", Qt.EditRole)
assert scan_control._metadata_form._additional_metadata.dump_dict() == {
"test key 1": "test value 1"
}
assert scan_control._scan_metadata == {"sample_name": "", "test key 1": "test value 1"}
def test_scan_metadata_is_connected(scan_control):
assert scan_control._metadata_form._scan_name == "line_scan"
scan_control.comboBox_scan_selection.setCurrentText("grid_scan")
@@ -548,16 +573,28 @@ def test_scan_metadata_is_connected(scan_control):
assert isinstance(sample_name, StrMetadataField)
sample_name._main_widget.setText("Test Sample")
scan_control._metadata_form._additional_metadata._table_model._data = [
["test key 1", "test value 1"],
["test key 2", "test value 2"],
]
scan_control._metadata_form._additional_metadata._table_model._data = TEST_TABLE_ENTRY
scan_control._metadata_form.validate_form()
assert scan_control._scan_metadata == {
"sample_name": "Test Sample",
"test key 1": "test value 1",
"test key 2": "test value 2",
}
assert scan_control._scan_metadata == TEST_MD
def test_scan_metadata_is_passed_to_scan_function(scan_control: ScanControl):
scan_control.comboBox_scan_selection.setCurrentText("grid_scan")
sample_name = scan_control._metadata_form._form_grid.layout().itemAtPosition(0, 1).widget()
sample_name._main_widget.setText("Test Sample")
scan_control._metadata_form._additional_metadata._table_model._data = TEST_TABLE_ENTRY
scan_control._metadata_form.validate_form()
assert scan_control._scan_metadata == TEST_MD
scans = SimpleNamespace(grid_scan=MagicMock())
with (
patch.object(scan_control, "scans", scans),
patch.object(scan_control, "get_scan_parameters", lambda: ((), {})),
):
scan_control.run_scan()
scans.grid_scan.assert_called_once_with(metadata=TEST_MD)
def test_restore_parameters_with_fewer_arg_bundles(scan_control, qtbot):

View File

@@ -0,0 +1,90 @@
from unittest import mock
import pytest
from qtpy.QtNetwork import QAuthenticator
from bec_widgets.widgets.editors.web_console.web_console import WebConsole, _web_console_registry
from .client_mocks import mocked_client
@pytest.fixture
def console_widget(qtbot, mocked_client):
with mock.patch(
"bec_widgets.widgets.editors.web_console.web_console.subprocess"
) as mock_subprocess:
with mock.patch.object(_web_console_registry, "_wait_for_server_port"):
_web_console_registry._server_port = 12345
# Create the WebConsole widget
widget = WebConsole(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_web_console_widget_initialization(console_widget):
assert (
console_widget.page.url().toString()
== f"http://localhost:{_web_console_registry._server_port}"
)
def test_web_console_write(console_widget):
# Test the write method
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
console_widget.write("Hello, World!")
assert mock.call("window.term.paste('Hello, World!');") in mock_run_js.mock_calls
def test_web_console_write_no_return(console_widget):
# Test the write method with send_return=False
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
console_widget.write("Hello, World!", send_return=False)
assert mock.call("window.term.paste('Hello, World!');") in mock_run_js.mock_calls
assert mock_run_js.call_count == 1
def test_web_console_send_return(console_widget):
# Test the send_return method
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
console_widget.send_return()
script = mock_run_js.call_args[0][0]
assert "new KeyboardEvent('keypress', {charCode: 13})" in script
assert mock_run_js.call_count == 1
def test_web_console_send_ctrl_c(console_widget):
# Test the send_ctrl_c method
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
console_widget.send_ctrl_c()
script = mock_run_js.call_args[0][0]
assert "new KeyboardEvent('keypress', {charCode: 3})" in script
assert mock_run_js.call_count == 1
def test_web_console_authenticate(console_widget):
# Test the _authenticate method
token = _web_console_registry._token
mock_auth = mock.MagicMock(spec=QAuthenticator)
console_widget._authenticate(None, mock_auth)
mock_auth.setUser.assert_called_once_with("user")
mock_auth.setPassword.assert_called_once_with(token)
def test_web_console_registry_wait_for_server_port():
# Test the _wait_for_server_port method
with mock.patch.object(_web_console_registry, "_server_process") as mock_subprocess:
mock_subprocess.stderr.readline.side_effect = [b"Starting", b"Listening on port: 12345"]
_web_console_registry._wait_for_server_port()
assert _web_console_registry._server_port == 12345
def test_web_console_registry_wait_for_server_port_timeout():
# Test the _wait_for_server_port method with timeout
with mock.patch.object(_web_console_registry, "_server_process") as mock_subprocess:
with pytest.raises(TimeoutError):
_web_console_registry._wait_for_server_port(timeout=0.1)