1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-10 02:30:54 +02:00

Compare commits

..

10 Commits

Author SHA1 Message Date
504794f86a test: cleanup to simplify testing 2024-12-03 09:04:11 +01:00
2e5ee7c5bd test: fix tests, avoid spinning up VSCode and BECConsole 2024-12-02 17:53:07 +01:00
b87cab6744 fix: clean up, run cli script, add docs 2024-12-02 17:42:11 +01:00
9ac4ce73ff fix: add cleanup to console, fix tests for user_script 2024-12-02 17:10:57 +01:00
710d7229a7 tests: add test for user script widget 2024-12-02 17:10:57 +01:00
9402ba82ff feat: add user script widget 2024-12-02 17:10:46 +01:00
semantic-release
a274a14900 1.7.0
Automatically generated by python-semantic-release
2024-12-02 15:21:52 +00:00
da579b6d21 fix(tests): add test for Console widget 2024-12-02 14:44:29 +01:00
02086aeae0 feat(console): add 'terminate' and 'send_ctrl_c' methods to Console
.terminate() ends the started process, sending SIGTERM signal.
If process is not dead after optional timeout, SIGKILL is sent.
.send_ctrl_c() sends SIGINT to the child process, and waits for
prompt until optional timeout is reached.
Timeouts raise 'TimeoutError' exception.
2024-12-02 14:44:29 +01:00
3aeb0b66fb feat(console): add "prompt" signal to inform when shell is at prompt 2024-12-02 14:44:29 +01:00
16 changed files with 1115 additions and 49 deletions

View File

@@ -1,14 +1,32 @@
# CHANGELOG
## v1.7.0 (2024-12-02)
### Bug Fixes
- **tests**: Add test for Console widget
([`da579b6`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/da579b6d213bcdf28c40c1a9e4e2535fdde824fb))
### Features
- **console**: Add "prompt" signal to inform when shell is at prompt
([`3aeb0b6`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/3aeb0b66fbeb03d3d0ee60e108cc6b98fd9aa9b9))
- **console**: Add 'terminate' and 'send_ctrl_c' methods to Console
([`02086ae`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/02086aeae09233ec4e6ccc0e6a17f2b078d500b8))
.terminate() ends the started process, sending SIGTERM signal. If process is not dead after optional
timeout, SIGKILL is sent. .send_ctrl_c() sends SIGINT to the child process, and waits for prompt
until optional timeout is reached. Timeouts raise 'TimeoutError' exception.
## v1.6.0 (2024-11-27)
### Bug Fixes
- **tests**: Make use of BECDockArea with client mixin to start server and use it in tests
([`da18c2c`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/da18c2ceecf9aeaf0e0ea9b78f4c867b27b9c314))
Depending on the test, auto-updates are enabled or not.
- Add back accidentally removed variables
([`e998352`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/e9983521ed2a1c04af048a55ece70a1943a84313))
- Differentiate click and drag for DeviceItem, adapt tests accordingly
([`cffcdf2`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/cffcdf292363249bcc7efa9d130431d0bc727fda))
@@ -17,12 +35,6 @@ This fixes the blocking "QDrag.exec_()" on Linux, indeed before the drag'n'drop
started with a simple click and it was waiting for drop forever. Now there are 2 different cases,
click or drag'n'drop - the drag'n'drop test actually moves the mouse and releases the button.
- **server**: Use dock area by default
([`2fe7f5e`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/2fe7f5e1510a5ea72676045e6ea3485e6b11c220))
- **rpc**: Gui hide/show also hide/show all floating docks
([`c27d058`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/c27d058b01fe604eccec76454e39360122e48515))
- Do not quit automatically when last window is "closed"
([`96e255e`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/96e255e4ef394eb79006a66d13e06775ae235667))
@@ -31,8 +43,16 @@ Qt confuses closed and hidden
- No need to call inspect.signature - it can fail on methods coming from C (like Qt methods)
([`6029246`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/60292465e9e52d3248ae681c68c07298b9b3ce14))
- Add back accidentally removed variables
([`e998352`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/e9983521ed2a1c04af048a55ece70a1943a84313))
- **rpc**: Gui hide/show also hide/show all floating docks
([`c27d058`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/c27d058b01fe604eccec76454e39360122e48515))
- **server**: Use dock area by default
([`2fe7f5e`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/2fe7f5e1510a5ea72676045e6ea3485e6b11c220))
- **tests**: Make use of BECDockArea with client mixin to start server and use it in tests
([`da18c2c`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/da18c2ceecf9aeaf0e0ea9b78f4c867b27b9c314))
Depending on the test, auto-updates are enabled or not.
### Features
@@ -40,26 +60,26 @@ Qt confuses closed and hidden
BECDockArea
([`31d8703`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/31d87036c9801e639a7ca6fc003c90e0c4edb19d))
- Add rpc_id member to client objects
([`3ba0b1d`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/3ba0b1daf5b83da840e90fbbc063ed7b86ebe99b))
- **client**: Add show()/hide() methods to "gui" object
([`e68e2b5`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/e68e2b5978339475b97555c3e20795807932fbc9))
- **server**: Add main window, with proper gui_id derived from given id
([`daf6ea0`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/daf6ea0159c9ffc7b53bb7ae6b9abc16a302972c))
- Add '--hide' argument to BEC GUI server
([`1f60fec`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/1f60fec7201ed252d7e49bf16f2166ee7f6bed6a))
- Add main window container widget
([`f80ec33`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/f80ec33ae5a261dbcab901ae30f4cc802316e554))
- Add rpc_id member to client objects
([`3ba0b1d`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/3ba0b1daf5b83da840e90fbbc063ed7b86ebe99b))
- Asynchronous .start() for GUI
([`2047e48`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/2047e484d5a4b2f5ea494a1e49035b35b1bbde35))
- Do not take focus when GUI is loaded
([`1f71d8e`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/1f71d8e5eded9952f9b34bfc427e2ff44cf5fc18))
- Add '--hide' argument to BEC GUI server
([`1f60fec`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/1f60fec7201ed252d7e49bf16f2166ee7f6bed6a))
- **client**: Add show()/hide() methods to "gui" object
([`e68e2b5`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/e68e2b5978339475b97555c3e20795807932fbc9))
- **server**: Add main window, with proper gui_id derived from given id
([`daf6ea0`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/daf6ea0159c9ffc7b53bb7ae6b9abc16a302972c))
## v1.5.3 (2024-11-21)
@@ -192,19 +212,3 @@ Qt confuses closed and hidden
- **colormap_button**: Colormap button with menu to select colormap filtered by the colormap type
([`b039933`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/b039933405e2fbe92bd81bd0748e79e8d443a741))
## v1.2.0 (2024-10-25)
### Features
- **colors**: Evenly spaced color generation + new golden ratio calculation
([`40c9fea`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/40c9fea35f869ef52e05948dd1989bcd99f602e0))
### Refactoring
- Add bec_lib version to statusbox
([`5d4b86e`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/5d4b86e1c6e1800051afce4f991153e370767fa6))
## v1.1.0 (2024-10-25)

View File

@@ -19,7 +19,6 @@ class Widgets(str, enum.Enum):
BECColorMapWidget = "BECColorMapWidget"
BECDockArea = "BECDockArea"
BECImageWidget = "BECImageWidget"
BECMainWindow = "BECMainWindow"
BECMotorMapWidget = "BECMotorMapWidget"
BECMultiWaveformWidget = "BECMultiWaveformWidget"
BECProgressBar = "BECProgressBar"
@@ -43,6 +42,7 @@ class Widgets(str, enum.Enum):
SignalLineEdit = "SignalLineEdit"
StopButton = "StopButton"
TextBox = "TextBox"
UserScriptWidget = "UserScriptWidget"
VSCodeEditor = "VSCodeEditor"
WebsiteWidget = "WebsiteWidget"
@@ -64,6 +64,13 @@ class AbortButton(RPCBase):
Get all registered RPC objects.
"""
@property
@rpc_call
def _rpc_id(self) -> "str":
"""
Get the RPC ID of the widget.
"""
class BECColorMapWidget(RPCBase):
@property
@@ -3682,6 +3689,9 @@ class TextBox(RPCBase):
"""
class UserScriptWidget(RPCBase): ...
class VSCodeEditor(RPCBase): ...

View File

@@ -10,9 +10,14 @@ import fcntl
import html
import os
import pty
import re
import signal
import sys
import time
import pyte
from bec_lib.logger import bec_logger
from pygments.token import Token
from pyte.screens import History
from qtpy import QtCore, QtGui, QtWidgets
from qtpy.QtCore import Property as pyqtProperty
@@ -23,6 +28,8 @@ from qtpy.QtWidgets import QApplication, QHBoxLayout, QScrollBar, QSizePolicy
from bec_widgets.qt_utils.error_popups import SafeSlot as Slot
logger = bec_logger.logger
ansi_colors = {
"black": "#000000",
"red": "#CD0000",
@@ -235,10 +242,14 @@ class BECConsole(QtWidgets.QWidget):
PLUGIN = True
ICON_NAME = "terminal"
prompt = pyqtSignal(bool)
def __init__(self, parent=None, cols=132):
super().__init__(parent)
self.term = _TerminalWidget(self, cols, rows=43)
self.term.prompt.connect(self.prompt) # forward signal from term to this widget
self.scroll_bar = QScrollBar(Qt.Vertical, self)
# self.scroll_bar.hide()
layout = QHBoxLayout(self)
@@ -320,9 +331,56 @@ class BECConsole(QtWidgets.QWidget):
def start(self, deactivate_ctrl_d=True):
self.term.start(deactivate_ctrl_d=deactivate_ctrl_d)
def push(self, text):
def push(self, text, hit_return=False):
"""Push some text to the terminal"""
return self.term.push(text)
return self.term.push(text, hit_return=hit_return)
def execute_command(self, command):
self.push(command, hit_return=True)
def set_prompt_tokens(self, *tokens):
"""Prepare regexp to identify prompt, based on tokens
Tokens are returned from get_ipython().prompts.in_prompt_tokens()
"""
regex_parts = []
for token_type, token_value in tokens:
if token_type == Token.PromptNum: # Handle dynamic prompt number
regex_parts.append(r"[\d\?]+") # Match one or more digits or '?'
else:
# Escape other prompt parts (e.g., "In [", "]: ")
if not token_value:
regex_parts.append(".+?") # arbitrary string
else:
regex_parts.append(re.escape(token_value))
# Combine into a single regex
prompt_pattern = "".join(regex_parts)
self.term._prompt_re = re.compile(prompt_pattern + r"\s*$")
def terminate(self, timeout=10):
self.term.stop(timeout=timeout)
def send_ctrl_c(self, timeout=None):
self.term.send_ctrl_c(timeout)
def cleanup(self):
"""Cleanup the terminal"""
self.execute_command("\x03") # Ctrl-C
self.execute_command("exit()")
timeout = 5
interval = 0.1
timer = 0
# os.close(self.term.fd)
while self.term.fd is not None:
time.sleep(interval)
timer += interval
if timer > 0.8 * timeout:
logger.warning(f"Terminal still cleaning up after {timer:.1f} seconds")
if timer > timeout:
logger.error(f"Terminal cleanup timed out after {timeout} seconds")
break
self.deleteLater()
cols = pyqtProperty(int, get_cols, set_cols)
rows = pyqtProperty(int, get_rows, set_rows)
@@ -336,7 +394,15 @@ class _TerminalWidget(QtWidgets.QPlainTextEdit):
Start ``Backend`` process and render Pyte output as text.
"""
prompt = pyqtSignal(bool)
def __init__(self, parent, cols=125, rows=50, **kwargs):
# regexp to match prompt
self._prompt_re = None
# last prompt
self._prompt_str = None
# process pid
self.pid = None
# file descriptor to communicate with the subprocess
self.fd = None
self.backend = None
@@ -433,7 +499,7 @@ class _TerminalWidget(QtWidgets.QPlainTextEdit):
self.update_term_size()
# Start the Bash process
self.fd = self.fork_shell()
self.pid, self.fd = self.fork_shell()
if self.fd:
# Create the ``Backend`` object
@@ -449,6 +515,62 @@ class _TerminalWidget(QtWidgets.QPlainTextEdit):
self.appendHtml(f"<br><h2>{repr(self._cmd)} - Process exited.</h2>")
self.setReadOnly(True)
def send_ctrl_c(self, wait_prompt=True, timeout=None):
"""Send CTRL-C to the process
If wait_prompt=True (default), wait for a new prompt after CTRL-C
If no prompt is displayed after 'timeout' seconds, TimeoutError is raised
"""
os.kill(self.pid, signal.SIGINT)
if wait_prompt:
timeout_error = False
if timeout:
def set_timeout_error():
nonlocal timeout_error
timeout_error = True
timeout_timer = QTimer()
timeout_timer.singleShot(timeout * 1000, set_timeout_error)
while self._prompt_str is None:
QApplication.instance().process_events()
if timeout_error:
raise TimeoutError(
f"CTRL-C: could not get back to prompt after {timeout} seconds."
)
def _is_running(self):
if os.waitpid(self.pid, os.WNOHANG) == (0, 0):
return True
return False
def stop(self, kill=True, timeout=None):
"""Stop the running process
SIGTERM is the default signal for terminating processes.
If kill=True (default), SIGKILL will be sent if the process does not exit after timeout
"""
# try to exit gracefully
os.kill(self.pid, signal.SIGTERM)
# wait until process is truly dead
t0 = time.perf_counter()
while self._is_running():
time.sleep(1)
if timeout is not None and time.perf_counter() - t0 > timeout:
# still alive after 'timeout' seconds
if kill:
# send SIGKILL and make a last check in loop
os.kill(self.pid, signal.SIGKILL)
kill = False
else:
# still running after timeout...
raise TimeoutError(
f"Could not terminate process with pid: {self.pid} within timeout"
)
self.process_exited()
def data_ready(self, screen):
"""Handle new screen: redraw, set scroll bar max and slider, move cursor to its position
@@ -540,11 +662,13 @@ class _TerminalWidget(QtWidgets.QPlainTextEdit):
elif code is not None:
self.write(code)
def push(self, text):
def push(self, text, hit_return=False):
"""
Write 'text' to terminal
"""
self.write(text.encode("utf-8"))
if hit_return:
self.write(b"\n")
def contextMenuEvent(self, event):
if self.fd is None:
@@ -650,6 +774,20 @@ class _TerminalWidget(QtWidgets.QPlainTextEdit):
self.output[line_no] = line
# fill the text area with HTML contents in one go
self.appendHtml(f"<pre>{chr(10).join(self.output)}</pre>")
if self._prompt_re is not None:
text_buf = self.toPlainText()
prompt = self._prompt_re.search(text_buf)
if prompt is None:
if self._prompt_str:
self.prompt.emit(False)
self._prompt_str = None
else:
prompt_str = prompt.string.rstrip()
if prompt_str != self._prompt_str:
self._prompt_str = prompt_str
self.prompt.emit(True)
# did updates, all clean
screen.dirty.clear()
@@ -711,7 +849,7 @@ class _TerminalWidget(QtWidgets.QPlainTextEdit):
# We are in the parent process.
# Set file control
fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)
return fd
return pid, fd
if __name__ == "__main__":
@@ -728,6 +866,24 @@ if __name__ == "__main__":
console = BECConsole(mainwin)
mainwin.setCentralWidget(console)
def check_prompt(at_prompt):
if at_prompt:
print("NEW PROMPT")
else:
print("EXECUTING SOMETHING...")
console.set_prompt_tokens(
(Token.OutPromptNum, ""),
(Token.Prompt, ""), # will match arbitrary string,
(Token.Prompt, " ["),
(Token.PromptNum, "3"),
(Token.Prompt, "/"),
(Token.PromptNum, "1"),
(Token.Prompt, "] "),
(Token.Prompt, ""),
)
console.prompt.connect(check_prompt)
console.start()
# Show widget and launch Qt's event loop.

View File

@@ -0,0 +1,17 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.editors.user_script.user_script_widget_plugin import (
UserScriptWidgetPlugin,
)
QPyDesignerCustomWidgetCollection.addCustomWidget(UserScriptWidgetPlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -0,0 +1,569 @@
import glob
import importlib
import inspect
import os
import pathlib
from collections import defaultdict
from pathlib import Path
from typing import Literal
import bec_lib
from bec_qthemes import material_icon
from pydantic import BaseModel
from pygments.token import Token
from qtpy.QtCore import QSize, Signal, Slot
from qtpy.QtWidgets import (
QDialog,
QGridLayout,
QGroupBox,
QHBoxLayout,
QHeaderView,
QLabel,
QLineEdit,
QPushButton,
QSizePolicy,
QSpacerItem,
QToolButton,
QTreeWidget,
QTreeWidgetItem,
QVBoxLayout,
QWidget,
)
from bec_widgets.qt_utils.error_popups import SafeSlot
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors, set_theme
from bec_widgets.widgets.editors.console.console import BECConsole
from bec_widgets.widgets.editors.vscode.vscode import VSCodeEditor
logger = bec_lib.bec_logger.logger
class EnchancedQTreeWidget(QTreeWidget):
"""Thin wrapper around QTreeWidget to add some functionality for user scripting"""
play_button_clicked = Signal(str)
edit_button_clicked = Signal(str)
def __init__(self, parent=None):
super().__init__(parent)
self.setColumnCount(2)
self.setHeaderHidden(True)
self.setObjectName(__class__.__name__)
self._update_style_sheet()
self._icon_size = QSize(24, 24)
self.setRootIsDecorated(False)
self.setUniformRowHeights(True)
self.setWordWrap(True)
self.setAnimated(True)
self.setIndentation(24)
self._adjust_size_policy()
def _adjust_size_policy(self):
"""Adjust the size policy"""
header = self.header()
header.setMinimumSectionSize(42)
header.setSectionResizeMode(0, QHeaderView.ResizeToContents)
header.setSectionResizeMode(1, QHeaderView.Stretch)
def _update_style_sheet(self) -> None:
"""Update the style sheet"""
name = __class__.__name__
colors = get_accent_colors()
# pylint: disable=protected-access
color = colors._palette.midlight().color().name()
self.setStyleSheet(
f"""
{name}::item {{
border: none;
background: transparent;
}}
QTreeView::branch:hover {{
background: transparent;
color: {color};
}}
{name}::item:hover {{
background: {color};
}}
{name}::item:selected:hover {{
background: {color};
}}
"""
)
def add_top_item(self, label: str) -> QTreeWidgetItem:
"""Add a top item to the tree widget
Args:
label (str): The label for the top item
Returns:
QTreeWidgetItem: The top item
"""
top_item = QTreeWidgetItem(self, [label])
top_item.setExpanded(True)
top_item.setSelected(False)
self.resizeColumnToContents(0)
return top_item
def add_module_item(self, top_item: QTreeWidgetItem, mod_name: str) -> QTreeWidgetItem:
"""Add a top item to the tree widget together with an edit button in column 0 and label in 1
Args:
top_item (QTreeWidgetItem): The top item to add the child item to
mod_name (str): The label for the child item
Returns:
QTreeWidgetItem: The top item
"""
child_item = QTreeWidgetItem(top_item)
# Add label
label = QLabel(mod_name, parent=top_item.treeWidget())
# Add edit button with label as parent
edit_button = self._create_button(parent=label, button_type="edit")
edit_button.clicked.connect(self._handle_edit_button_clicked)
self.setItemWidget(child_item, 0, edit_button)
self.setItemWidget(child_item, 1, label)
self.resizeColumnToContents(0)
return child_item
def add_child_item(self, top_item: QTreeWidgetItem, label: str) -> None:
"""Add a child item to the top item together with a play button in column 1
Args:
top_item (QTreeWidgetItem): The top item to add the child item to
label (str): The label for the child item
Returns:
QTreeWidgetItem: The child item
"""
widget = QWidget(top_item.treeWidget())
label = QLabel(label)
spacer = QSpacerItem(0, 0, QSizePolicy.Expanding, QSizePolicy.Minimum)
layout = QHBoxLayout(widget)
layout.addWidget(label)
layout.addItem(spacer)
layout.setSpacing(4)
layout.setContentsMargins(0, 0, 0, 0)
button = self._create_button(parent=top_item.treeWidget(), button_type="play")
button.clicked.connect(self._handle_play_button_clicked)
layout.addWidget(button)
child_item = QTreeWidgetItem(top_item)
self.setItemWidget(child_item, 1, widget)
return child_item
@Slot()
def _handle_edit_button_clicked(self):
"""Handle the click of the edit button"""
button = self.sender()
tree_widget_item = self.itemAt(button.pos())
text = self.itemWidget(tree_widget_item, 1).text()
self.edit_button_clicked.emit(text)
@Slot()
def _handle_play_button_clicked(self):
"""Handle the click of the play button"""
button = self.sender()
widget = button.parent()
text = widget.findChild(QLabel).text()
self.play_button_clicked.emit(text)
def _create_button(self, parent: QWidget, button_type: Literal["edit", "play"]) -> QToolButton:
"""Create a button for 'edit' or 'play'
Args:
button_type (Literal["edit", "play"]): The type of button to create
"""
colors = get_accent_colors()
if button_type == "edit":
color = colors.highlight
name = "edit_document"
elif button_type == "play":
color = colors.success
name = "play_arrow"
else:
raise ValueError("Invalid button type")
button = QToolButton(
parent=parent,
icon=material_icon(
name, filled=False, color=color, size=self._icon_size, convert_to_pixmap=False
),
)
button.setContentsMargins(0, 0, 0, 0)
button.setStyleSheet("QToolButton { border: none; }")
return button
def _hide_buttons(self, exclude_item: QWidget = None):
for button in self.viewport().findChildren(QToolButton):
if exclude_item is not None:
if button.parent() == exclude_item:
continue
button.setVisible(False)
class VSCodeDialog(QDialog):
"""Dialog for the VSCode editor"""
def __init__(self, parent=None, client=None, editor: VSCodeEditor = None):
super().__init__(parent=parent)
self.setWindowTitle("VSCode Editor")
self.setMinimumWidth(800)
self.setMinimumHeight(600)
self.layout = QVBoxLayout(self)
self.editor = editor
self.init_ui()
def init_ui(self):
"""Initialize the UI. Note: this makes the code easier to test."""
self.layout.addWidget(self.editor)
class InputDialog(QDialog):
"""Dialog for input
Args:
header (str): The header of the dialog
info (str): The information of the dialog
fields (dict): The fields of the dialog
parent (QWidget): The parent widget
"""
def __init__(self, header: str, info: str, fields: dict, parent=None):
super().__init__(parent=parent)
self.header = header
self.info = info
self.fields = fields
self._layout = QVBoxLayout(self)
self.button_ok = QPushButton(parent=self, text="OK")
self.button_cancel = QPushButton(parent=self, text="Cancel")
self._init_ui()
self.button_ok.clicked.connect(self.accept)
self.button_cancel.clicked.connect(self.reject)
def _init_ui(self):
"""Initialize the UI"""
self.setWindowTitle(f"{self.header}")
self.setMinimumWidth(200)
box = QGroupBox(self)
box.setTitle(self.info)
layout = QGridLayout(box)
layout.setSpacing(4)
layout.setContentsMargins(4, 30, 4, 30)
row = 0
for name, default in self.fields.items():
label = QLabel(parent=self, text=name)
line_input = QLineEdit(parent=self)
line_input.setObjectName(name)
if default is not None:
line_input.setText(f"{default}")
layout.addWidget(label, row, 0)
layout.addWidget(line_input, row, 1)
row += 1
self._layout.addWidget(box)
widget = QWidget(self)
sub_layout = QHBoxLayout(widget)
sub_layout.addWidget(self.button_ok)
sub_layout.addWidget(self.button_cancel)
self._layout.addWidget(widget)
self.setLayout(self._layout)
self.resize(self._layout.sizeHint() * 1.05)
def get_inputs(self):
"""Get the input from the dialog"""
out = {}
for name, _ in self.fields.items():
line_input = self.findChild(QLineEdit, name)
if line_input is not None:
out[name] = line_input.text()
return out
class ScriptBlock(BaseModel):
"""Model block for a script"""
location: Literal["BEC", "USER", "BL"]
fname: str
module_name: str
user_script_name: str | None = None
class UserScriptWidget(BECWidget, QWidget):
"""Dialog for displaying the fit summary and params for LMFit DAP processes."""
PLUGIN = True
USER_ACCESS = []
ICON_NAME = "manage_accounts"
def __init__(
self,
parent=None,
client=None,
config=None,
gui_id: str | None = None,
vs_code_editor=None,
bec_console=None,
):
"""
Initialize the widget
Args:
parent (QWidget): The parent widget
client (BECClient): The BEC client
config (dict): The configuration
gui_id (str): The GUI ID
vs_code_editor (VSCodeEditor): The VSCode editor, dep injection here makes makes testing easier, if None defaults to VSCodeEditor
bec_console (BECConsole): The BEC console, note this makes testing easier, if None defaults to BECConsole
"""
super().__init__(client=client, config=config, gui_id=gui_id, theme_update=True)
QWidget.__init__(self, parent=parent)
self.button_new_script = QPushButton(parent=self, text="New Script")
self.button_new_script.setObjectName("button_new_script")
if vs_code_editor is None:
vs_code_editor = VSCodeEditor(parent=self, client=self.client, gui_id=self.gui_id)
self._vscode_editor = vs_code_editor
if bec_console is None:
bec_console = BECConsole(parent=self)
self._console = bec_console
self.tree_widget = EnchancedQTreeWidget(parent=self)
self.layout = QVBoxLayout(self)
self.user_scripts = defaultdict(lambda: ScriptBlock)
self._base_path = os.path.join(str(Path.home()), "bec", "scripts")
self._icon_size = QSize(16, 16)
self._script_button_register = {}
self._code_dialog = None
self._script_dialog = None
self._new_script_dialog = None
self.init_ui()
self.button_new_script.clicked.connect(self.new_script)
self.tree_widget.edit_button_clicked.connect(self.handle_edit_button_clicked)
self.tree_widget.play_button_clicked.connect(self.handle_play_button_clicked)
def apply_theme(self, theme: str):
"""Apply the theme"""
self._update_button_ui()
self.update_user_scripts()
self.tree_widget._update_style_sheet()
super().apply_theme(theme)
def _setup_console(self):
"""Setup the console. Toents are needed to allow for the console to check for the prompt during shutdown."""
self._console.set_prompt_tokens(
(Token.OutPromptNum, ""),
(Token.Prompt, ""), # will match arbitrary string,
(Token.Prompt, " ["),
(Token.PromptNum, "3"),
(Token.Prompt, "/"),
(Token.PromptNum, "1"),
(Token.Prompt, "] "),
(Token.Prompt, ""),
)
self._console.start()
# Comment to not hide the console for debugging
self._console.hide()
def init_ui(self):
"""Initialize the UI"""
# Add buttons
widget = QWidget(self)
layout = QHBoxLayout(widget)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(4)
layout.addWidget(self.button_new_script)
self.layout.addWidget(widget)
self.layout.addWidget(self.tree_widget)
# Uncomment to show the console for debugging
# self.layout.addWidget(self._console)
self.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
self._vscode_editor.hide()
self._update_button_ui()
self._setup_console()
self.update_user_scripts()
self._vscode_editor.file_saved.connect(self._handle_file_saved)
@Slot(str)
def _handle_file_saved(self, fname: str):
"""Handle the file saved signal"""
self.update_user_scripts()
def _update_button_ui(self):
"""Update the button UI"""
colors = get_accent_colors()
name = self.button_new_script.objectName()
self.button_new_script.setStyleSheet(
f"QWidget#{name} {{ color: {colors._palette.windowText().color().name()}; }}"
)
def save_script(self):
"""Save the script"""
self._vscode_editor.save_file()
self._vscode_editor.hide()
if self._code_dialog is not None:
self._code_dialog.hide()
self.update_user_scripts()
def open_script(self, fname: str):
"""Open a script
Args:
fname (str): The file name of the script
"""
if self._code_dialog is None:
self._code_dialog = VSCodeDialog(parent=self, editor=self._vscode_editor)
self._code_dialog.show()
self._vscode_editor.show()
# Only works after show was called for the first time
self._vscode_editor.zen_mode()
else:
self._code_dialog.show()
self._vscode_editor.show()
self._vscode_editor.open_file(fname)
@SafeSlot(popup_error=True)
def new_script(self, *args, **kwargs):
"""Create a new script"""
self._new_script_dialog = InputDialog(
header="New Script", info="Enter filename for new script", fields={"Filename": ""}
)
if self._new_script_dialog.exec_():
name = self._new_script_dialog.get_inputs()["Filename"]
check_name = name.replace("_", "").replace("-", "")
if not check_name.isalnum() or not check_name.isascii():
raise NameError(f"Invalid name {name}, must be alphanumeric and ascii")
if not name.endswith(".py"):
name = name + ".py"
fname = os.path.join(self._base_path, name)
# Check if file exists on disk
if os.path.exists(fname):
logger.error(f"File {fname} already exists")
raise FileExistsError(f"File {fname} already exists")
try:
os.makedirs(os.path.dirname(fname), exist_ok=True, mode=0o775)
with open(fname, "w", encoding="utf-8") as f:
f.write("# New BEC Script\n")
except Exception as e:
logger.error(f"Error creating new script: {e}")
raise e
self.open_script(fname)
def get_script_files(self) -> dict:
"""Get all script files in the base path"""
files = {"BEC": [], "USER": [], "BL": []}
# bec
bec_lib_path = pathlib.Path(bec_lib.__file__).parent.parent.resolve()
bec_scripts_dir = os.path.join(str(bec_lib_path), "scripts")
files["BEC"].extend(glob.glob(os.path.abspath(os.path.join(bec_scripts_dir, "*.py"))))
# user
user_scripts_dir = os.path.join(os.path.expanduser("~"), "bec", "scripts")
if os.path.exists(user_scripts_dir):
files["USER"].extend(glob.glob(os.path.abspath(os.path.join(user_scripts_dir, "*.py"))))
# load scripts from the beamline plugin
plugins = importlib.metadata.entry_points(group="bec")
for plugin in plugins:
if plugin.name == "plugin_bec":
plugin = plugin.load()
plugin_scripts_dir = os.path.join(plugin.__path__[0], "scripts")
if os.path.exists(plugin_scripts_dir):
files["BL"].extend(
glob.glob(os.path.abspath(os.path.join(plugin_scripts_dir, "*.py")))
)
return files
@SafeSlot()
def reload_user_scripts(self, *args, **kwargs):
"""Reload the user scripts"""
self.client.load_all_user_scripts()
@Slot()
def update_user_scripts(self) -> None:
"""Update the user scripts"""
self.user_scripts.clear()
self.tree_widget.clear()
script_files = self.get_script_files()
for key, files in script_files.items():
if len(files) == 0:
continue
top_item = self.tree_widget.add_top_item(key)
for fname in files:
mod_name = fname.split("/")[-1].strip(".py")
self.user_scripts[mod_name] = ScriptBlock(
fname=fname, module_name=mod_name, location=key
)
child_item = self.tree_widget.add_module_item(top_item, mod_name)
# pylint: disable=protected-access
self.reload_user_scripts(popup_error=True)
for user_script_name, info in self.client._scripts.items():
if info["fname"] == fname:
self.user_scripts[mod_name].user_script_name = user_script_name
_ = self.tree_widget.add_child_item(child_item, user_script_name)
self.tree_widget.expandAll()
@Slot(str)
def handle_edit_button_clicked(self, text: str):
"""Handle the click of the edit button"""
self.open_script(self.user_scripts[text].fname)
@Slot(str)
def handle_play_button_clicked(self, text: str):
"""Handle the click of the play button"""
self._console.execute_command("bec.load_all_user_scripts()")
info = self.client._scripts[text]
caller_args = inspect.getfullargspec(info["cls"])
args = caller_args.args + caller_args.kwonlyargs
if args:
self._handle_call_with_args(text, caller_args)
else:
self._console.execute_command(f"{text}()")
def _handle_call_with_args(self, text: str, caller_args: inspect.FullArgSpec) -> None:
"""Handle the call with arguments"""
defaults = []
args = caller_args.args + caller_args.kwonlyargs
for value in args:
if caller_args.kwonlydefaults is not None:
defaults.append(caller_args.kwonlydefaults.get(value, None))
fields = dict((arg, default) for arg, default in zip(args, defaults))
info = ", ".join([f"{k}={v}" for k, v in fields.items()]).replace("None", "")
info = f"Example: {text}({info})"
self._script_dialog = InputDialog(
parent=self, header="Script Arguments", info=info, fields=fields
)
if self._script_dialog.exec_():
args = self._script_dialog.get_inputs()
args = ", ".join([f"{k}={v}" for k, v in args.items()])
self._console.execute_command(f"{text}({args})")
self._script_dialog = None
def cleanup(self):
"""Cleanup the widget"""
self._vscode_editor.cleanup()
self._vscode_editor.deleteLater()
if self._code_dialog is not None:
self._code_dialog.deleteLater()
if self._script_dialog is not None:
self._script_dialog.deleteLater()
if self._new_script_dialog is not None:
self._new_script_dialog.deleteLater()
self.tree_widget.clear()
self._console.cleanup()
if __name__ == "__main__":
from qtpy.QtWidgets import QApplication
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
app = QApplication([])
set_theme("dark")
w = QWidget()
layout = QVBoxLayout(w)
layout.addWidget(UserScriptWidget())
w.setFixedHeight(400)
w.setFixedWidth(400)
w.show()
app.exec_()

View File

@@ -0,0 +1 @@
{'files': ['user_script.py']}

View File

@@ -0,0 +1,54 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.editors.user_script.user_script import UserScriptWidget
DOM_XML = """
<ui language='c++'>
<widget class='UserScriptWidget' name='user_script_widget'>
</widget>
</ui>
"""
class UserScriptWidgetPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = UserScriptWidget(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return "BEC Services"
def icon(self):
return designer_material_icon(UserScriptWidget.ICON_NAME)
def includeFile(self):
return "user_script_widget"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "UserScriptWidget"
def toolTip(self):
return "Dialog for displaying the fit summary and params for LMFit DAP processes"
def whatsThis(self):
return self.toolTip()

View File

@@ -124,16 +124,41 @@ class BECStatusBox(BECWidget, CompactPopupWidget):
self.tree = QTreeWidget(self)
self.tree.setHeaderHidden(True)
# TODO probably here is a problem still with setting the stylesheet
# self.tree.setStyleSheet(
# "QTreeWidget::item:!selected "
# "{ "
# "border: 1px solid gainsboro; "
# "border-left: none; "
# "border-top: none; "
# "}"
# "QTreeWidget::item:selected {}"
# )
self.tree.setStyleSheet(
"QTreeWidget::item:!selected "
"{ "
"QTreeWidget::item:!selected { "
"border: 1px solid gainsboro; "
"border-left: none; "
"border-top: none; "
"} "
"QTreeWidget::item:selected {} "
"QTreeView::branch { "
"border-image: none; "
"background: transparent; "
"} "
"QTreeView::branch:has-siblings:!adjoins-item { "
"border-image: none; "
"} "
"QTreeView::branch:has-children:!has-siblings:closed, "
"QTreeView::branch:closed:has-children:has-siblings { "
"border-image: none; "
"} "
"QTreeView::branch:open:has-children:!has-siblings, "
"QTreeView::branch:open:has-children:has-siblings { "
"border-image: none; "
"}"
"QTreeWidget::item:selected {}"
)
# self.tree.setRootIsDecorated(False)
def _create_status_widget(
self, service_name: str, status=BECStatus, info: dict = None, metrics: dict = None
) -> StatusItem:

View File

@@ -23,7 +23,7 @@ MODULE_PATH = os.path.dirname(bec_widgets.__file__)
class IconsEnum(enum.Enum):
"""Enum class for icons in the status item widget."""
RUNNING = "done_outline"
RUNNING = "check_circle"
BUSY = "progress_activity"
IDLE = "progress_activity"
ERROR = "emergency_home"

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

View File

@@ -0,0 +1,35 @@
(user.widgets.user_script_widget)=
# User Script Widget
````{tab} Overview
The [`UserScriptWidget`] is designed to allow users to run their user-defined scripts directly from a BEC GUI. This widget lists all available user scripts and allows users to execute them with a single click. The widget also provides an interface to open a VSCode editor to modify the files hosting the user scripts. This widget is particularly useful to provide a user-friendly interface to run custom scripts to users without using the command line. We note that the scripts are executed in a BEC client that does not share the full namespace with the BEC IPython kernel.
## Key Features:
- **User Script Execution**: Run user-defined scripts directly from the BEC GUI.
- **VSCode Integration**: Open the VSCode editor to modify the files hosting the user scripts.
````{tab} Examples
The `UserScriptWidget` widget can be integrated within a [`BECDockArea`](user.widgets.bec_dock_area) or used as an individual component in your application through `BECDesigner`. Below are examples demonstrating how to create and use the `BECStatusBox` widget.
## Example 1 - Adding BEC Status Box to BECDockArea
In this example, we demonstrate how to add a `BECStatusBox` widget to a `BECDockArea`, allowing users to monitor the status of BEC processes directly from the GUI.
```python
# Add a new dock with a BECStatusBox widget
user_script = gui.add_dock().add_widget("UserScriptWidget")
```
```{hint}
The widget will automatically display the list of available user scripts. Users can click on the script name to execute it.
```
````
````{tab} API
```{eval-rst}
.. include:: /api_reference/_autosummary/bec_widgets.cli.client.UserScriptWidget.rst
```
````

View File

@@ -134,6 +134,15 @@ Display status of BEC services.
Display current scan queue.
```
```{grid-item-card} User Script Widget
:link: user.widgets.user_script_widget
:link-type: ref
:img-top: /assets/widget_screenshots/user_script_widget.png
Run user-defined scripts directly from the BEC GUI.
```
````
## BEC Utility Widgets
@@ -238,6 +247,7 @@ Display DAP summaries of LMFit models in a window.
Select DAP model from a list of DAP processes.
```
````
```{toctree}
@@ -270,5 +280,6 @@ signal_input/signal_input.md
position_indicator/position_indicator.md
lmfit_dialog/lmfit_dialog.md
dap_combo_box/dap_combo_box.md
user_script_widget/user_script_widget.md
```

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "bec_widgets"
version = "1.6.0"
version = "1.7.0"
description = "BEC Widgets"
requires-python = ">=3.10"
classifiers = [

View File

@@ -0,0 +1,65 @@
import sys
import threading
import time
import pytest
from pygments.token import Token
from qtpy.QtCore import QEventLoop
from bec_widgets.utils.colors import apply_theme
from bec_widgets.widgets.editors.console.console import BECConsole
@pytest.fixture
def console_widget(qtbot):
apply_theme("light")
console = BECConsole()
console.set_cmd(sys.executable) # will launch Python interpreter
console.set_prompt_tokens((Token.Prompt, ">>>"))
qtbot.addWidget(console)
console.show()
qtbot.waitExposed(console)
yield console
console.terminate()
def test_console_widget(console_widget, qtbot, tmp_path):
def wait_prompt(command_to_execute=None, busy=False):
signal_waiter = QEventLoop()
def exit_loop(idle):
if busy and not idle:
signal_waiter.quit()
elif not busy and idle:
signal_waiter.quit()
console_widget.prompt.connect(exit_loop)
if command_to_execute:
if callable(command_to_execute):
command_to_execute()
else:
console_widget.execute_command(command_to_execute)
signal_waiter.exec_()
console_widget.start()
wait_prompt()
# use console to write something to a tmp file
tmp_filename = str(tmp_path / "console_test.txt")
wait_prompt(f"f = open('{tmp_filename}', 'wt'); f.write('HELLO CONSOLE'); f.close()")
# check the code has been executed by console, by checking the tmp file contents
with open(tmp_filename, "rt") as f:
assert f.read() == "HELLO CONSOLE"
# execute a sleep
t0 = time.perf_counter()
wait_prompt("import time; time.sleep(1)")
assert time.perf_counter() - t0 >= 1
# test ctrl-c
t0 = time.perf_counter()
wait_prompt("time.sleep(5)", busy=True)
wait_prompt(console_widget.send_ctrl_c)
assert (
time.perf_counter() - t0 < 1
) # in reality it will be almost immediate, but ok we can say less than 1 second compared to 5

View File

@@ -0,0 +1,119 @@
import inspect
from unittest import mock
import pytest
from qtpy.QtWidgets import QLabel
from bec_widgets.widgets.editors.user_script.user_script import UserScriptWidget
from .client_mocks import mocked_client
def dummy_script():
pass
def dummy_script_with_args(arg1: str, arg2: int = 0):
pass
@pytest.fixture
def SCRIPTS(tmp_path):
"""Create dummy script files"""
home_script = f"{tmp_path}/dummy_path_home_scripts/home_testing.py"
bec_script = f"{tmp_path}/dummy_path_bec_lib_scripts/bec_testing.py"
rtr = {
"dummy_script": {"cls": dummy_script, "fname": home_script},
"dummy_script_with_args": {"cls": dummy_script_with_args, "fname": bec_script},
}
return rtr
@pytest.fixture
def user_script_widget(SCRIPTS, qtbot, mocked_client):
mocked_client._scripts = SCRIPTS
files = {
"USER": [SCRIPTS["dummy_script"]["fname"]],
"BEC": [SCRIPTS["dummy_script_with_args"]["fname"]],
}
mock_console = mock.MagicMock()
mock_vscode = mock.MagicMock()
with mock.patch(
"bec_widgets.widgets.editors.user_script.user_script.UserScriptWidget.get_script_files",
return_value=files,
):
with mock.patch("bec_widgets.widgets.editors.user_script.user_script.VSCodeDialog.init_ui"):
widget = UserScriptWidget(
client=mocked_client, vs_code_editor=mock_vscode, bec_console=mock_console
)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_user_script_widget_start_up(SCRIPTS, user_script_widget):
"""Test init the user_script widget with dummy scripts from above"""
assert user_script_widget.tree_widget.columnCount() == 2
assert len(user_script_widget.tree_widget.children()[0].children()) == 6
assert user_script_widget.user_scripts["home_testing"].location == "USER"
assert user_script_widget.user_scripts["home_testing"].module_name == "home_testing"
assert user_script_widget.user_scripts["home_testing"].fname == SCRIPTS["dummy_script"]["fname"]
assert user_script_widget.user_scripts["home_testing"].user_script_name == dummy_script.__name__
assert user_script_widget.user_scripts["bec_testing"].location == "BEC"
assert user_script_widget.user_scripts["bec_testing"].module_name == "bec_testing"
assert (
user_script_widget.user_scripts["bec_testing"].fname
== SCRIPTS["dummy_script_with_args"]["fname"]
)
assert (
user_script_widget.user_scripts["bec_testing"].user_script_name
== dummy_script_with_args.__name__
)
for label in user_script_widget.tree_widget.children()[0].findChildren(QLabel):
assert label.text() in [
"home_testing",
"bec_testing",
"dummy_script",
"dummy_script_with_args",
]
def test_handle_open_script(SCRIPTS, user_script_widget):
"""Test handling open script"""
with mock.patch.object(user_script_widget, "open_script") as mock_open_script:
user_script_widget.handle_edit_button_clicked("home_testing")
fp = SCRIPTS["dummy_script"]["fname"]
mock_open_script.assert_called_once_with(fp)
def test_open_script(user_script_widget):
"""Test opening script"""
assert user_script_widget._code_dialog is None
# Override the _vscode_ed
with mock.patch.object(user_script_widget._vscode_editor, "show") as mock_show:
with mock.patch.object(user_script_widget._vscode_editor, "open_file") as mock_open_file:
with mock.patch.object(user_script_widget._vscode_editor, "zen_mode") as mock_zen_mode:
user_script_widget.open_script("/dummy_path_home_scripts/home_testing.py")
mock_show.assert_called_once()
mock_open_file.assert_called_once_with("/dummy_path_home_scripts/home_testing.py")
mock_zen_mode.assert_called_once()
assert user_script_widget._code_dialog is not None
def test_play_button(user_script_widget):
"""Test play button"""
with mock.patch.object(user_script_widget, "_console") as mock_console:
with mock.patch.object(user_script_widget, "_handle_call_with_args") as mock_handle_call:
# Test first with no args
user_script_widget.handle_play_button_clicked("dummy_script")
mock_console.execute_command.caller_args == [
mock.call("bec.load_all_user_scripts()"),
mock.call("dummy_script()"),
]
assert user_script_widget._script_dialog is None
# Test with args
user_script_widget.handle_play_button_clicked("dummy_script_with_args")
caller_args = inspect.getfullargspec(dummy_script_with_args)
assert mock_handle_call.call_args == mock.call("dummy_script_with_args", caller_args)