mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-04-16 05:30:54 +02:00
Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f68f072da3 | ||
| 1df6c1925b | |||
| 6b939ac34d | |||
|
|
6bcf20af07 | ||
| a64cf0dd87 | |||
| cd4e90a79f | |||
|
|
49a96a18d6 | ||
| 2b4454a291 | |||
| d12bd9fe1a | |||
| d0c1ac0cf5 | |||
| f90150d1c7 |
10
.github/workflows/end2end-conda.yml
vendored
10
.github/workflows/end2end-conda.yml
vendored
@@ -47,4 +47,12 @@ jobs:
|
||||
source ./bin/install_bec_dev.sh -t
|
||||
cd ../
|
||||
pip install -e ./ophyd_devices -e .[dev,pyside6] -e ./bec_testing_plugin
|
||||
pytest -v --files-path ./ --start-servers --random-order ./tests/end-2-end
|
||||
pytest -v --files-path ./ --start-servers --random-order ./tests/end-2-end
|
||||
|
||||
- name: Upload logs if job fails
|
||||
if: failure()
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: pytest-logs
|
||||
path: ./logs/*.log
|
||||
retention-days: 7
|
||||
9
.github/workflows/formatter.yml
vendored
9
.github/workflows/formatter.yml
vendored
@@ -14,10 +14,15 @@ jobs:
|
||||
|
||||
- name: Run black and isort
|
||||
run: |
|
||||
pip install black isort
|
||||
pip install -e .[dev]
|
||||
pip install uv
|
||||
uv pip install --system black isort
|
||||
uv pip install --system -e .[dev]
|
||||
black --check --diff --color .
|
||||
isort --check --diff ./
|
||||
|
||||
- name: Check for disallowed imports from PySide
|
||||
run: '! grep -re "from PySide6\." bec_widgets/ | grep -v -e "PySide6.QtDesigner" -e "PySide6.scripts"'
|
||||
|
||||
Pylint:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
|
||||
47
CHANGELOG.md
47
CHANGELOG.md
@@ -1,6 +1,53 @@
|
||||
# CHANGELOG
|
||||
|
||||
|
||||
## v2.10.2 (2025-06-03)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- Remove unnecessary PySide imports
|
||||
([`1df6c19`](https://github.com/bec-project/bec_widgets/commit/1df6c1925b6ec88df8d7a1a5a79a5ddc6b1161b5))
|
||||
|
||||
### Continuous Integration
|
||||
|
||||
- Check for disallowed imports from PySide
|
||||
([`6b939ac`](https://github.com/bec-project/bec_widgets/commit/6b939ac34d01cdbb0e8e32a0bd4e56cad032e75b))
|
||||
|
||||
|
||||
## v2.10.1 (2025-06-02)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- **console**: Qt console widget deleted
|
||||
([`cd4e90a`](https://github.com/bec-project/bec_widgets/commit/cd4e90a79fcdbc96f4ec23db22375d05a48731db))
|
||||
|
||||
### Build System
|
||||
|
||||
- Pyte removed from dependencies
|
||||
([`a64cf0d`](https://github.com/bec-project/bec_widgets/commit/a64cf0dd871c1419e02d3803c74cc45966baac19))
|
||||
|
||||
|
||||
## v2.10.0 (2025-06-02)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- **waveform**: Waveform only update async data when scan is currently running
|
||||
([`f90150d`](https://github.com/bec-project/bec_widgets/commit/f90150d1c708331d4ee78f82ebf5ef23cd81fd17))
|
||||
|
||||
### Continuous Integration
|
||||
|
||||
- Add job logs to e2e test
|
||||
([`d12bd9f`](https://github.com/bec-project/bec_widgets/commit/d12bd9fe1a010babc94dc86405d1b75a2b07534c))
|
||||
|
||||
- Fix artifact version
|
||||
([`2b4454a`](https://github.com/bec-project/bec_widgets/commit/2b4454a291bc69399ddd08780c44e1339825fb36))
|
||||
|
||||
### Features
|
||||
|
||||
- **waveform**: Large async dataset warning popup
|
||||
([`d0c1ac0`](https://github.com/bec-project/bec_widgets/commit/d0c1ac0cf5d421d14c9e050ccf5832cd30ca0764))
|
||||
|
||||
|
||||
## v2.9.2 (2025-05-30)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
@@ -3970,6 +3970,48 @@ class Waveform(RPCBase):
|
||||
The color palette of the figure widget.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def skip_large_dataset_warning(self) -> "bool":
|
||||
"""
|
||||
Whether to skip the large dataset warning when fetching async data.
|
||||
"""
|
||||
|
||||
@skip_large_dataset_warning.setter
|
||||
@rpc_call
|
||||
def skip_large_dataset_warning(self) -> "bool":
|
||||
"""
|
||||
Whether to skip the large dataset warning when fetching async data.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def skip_large_dataset_check(self) -> "bool":
|
||||
"""
|
||||
Whether to skip the large dataset warning when fetching async data.
|
||||
"""
|
||||
|
||||
@skip_large_dataset_check.setter
|
||||
@rpc_call
|
||||
def skip_large_dataset_check(self) -> "bool":
|
||||
"""
|
||||
Whether to skip the large dataset warning when fetching async data.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def max_dataset_size_mb(self) -> "float":
|
||||
"""
|
||||
The maximum dataset size (in MB) permitted when fetching async data from history before prompting the user.
|
||||
"""
|
||||
|
||||
@max_dataset_size_mb.setter
|
||||
@rpc_call
|
||||
def max_dataset_size_mb(self) -> "float":
|
||||
"""
|
||||
The maximum dataset size (in MB) permitted when fetching async data from history before prompting the user.
|
||||
"""
|
||||
|
||||
@rpc_call
|
||||
def plot(
|
||||
self,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from bec_lib.logger import bec_logger
|
||||
from PySide6.QtGui import QCloseEvent
|
||||
from qtpy.QtGui import QCloseEvent
|
||||
from qtpy.QtWidgets import QDialog, QDialogButtonBox, QHBoxLayout, QPushButton, QVBoxLayout, QWidget
|
||||
|
||||
from bec_widgets.utils.error_popups import SafeSlot
|
||||
|
||||
@@ -9,7 +9,7 @@ from bec_widgets.utils.plugin_utils import get_custom_classes
|
||||
logger = bec_logger.logger
|
||||
|
||||
if PYSIDE6:
|
||||
from PySide6.QtUiTools import QUiLoader
|
||||
from qtpy.QtUiTools import QUiLoader
|
||||
|
||||
class CustomUiLoader(QUiLoader):
|
||||
def __init__(self, baseinstance, custom_widgets: dict | None = None):
|
||||
|
||||
@@ -1,870 +0,0 @@
|
||||
"""
|
||||
BECConsole is a Qt widget that runs a Bash shell.
|
||||
|
||||
BECConsole VT100 emulation is powered by Pyte,
|
||||
(https://github.com/selectel/pyte).
|
||||
"""
|
||||
|
||||
import collections
|
||||
import fcntl
|
||||
import html
|
||||
import os
|
||||
import pty
|
||||
import re
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
|
||||
import pyte
|
||||
from pygments.token import Token
|
||||
from pyte.screens import History
|
||||
from qtpy import QtCore, QtGui, QtWidgets
|
||||
from qtpy.QtCore import Property as pyqtProperty
|
||||
from qtpy.QtCore import QSize, QSocketNotifier, Qt, QTimer
|
||||
from qtpy.QtCore import Signal as pyqtSignal
|
||||
from qtpy.QtGui import QClipboard, QColor, QPalette, QTextCursor
|
||||
from qtpy.QtWidgets import QApplication, QHBoxLayout, QScrollBar, QSizePolicy
|
||||
|
||||
from bec_widgets.utils.error_popups import SafeSlot as Slot
|
||||
|
||||
ansi_colors = {
|
||||
"black": "#000000",
|
||||
"red": "#CD0000",
|
||||
"green": "#00CD00",
|
||||
"brown": "#996633", # Brown, replacing the yellow
|
||||
"blue": "#0000EE",
|
||||
"magenta": "#CD00CD",
|
||||
"cyan": "#00CDCD",
|
||||
"white": "#E5E5E5",
|
||||
"brightblack": "#7F7F7F",
|
||||
"brightred": "#FF0000",
|
||||
"brightgreen": "#00FF00",
|
||||
"brightyellow": "#FFFF00",
|
||||
"brightblue": "#5C5CFF",
|
||||
"brightmagenta": "#FF00FF",
|
||||
"brightcyan": "#00FFFF",
|
||||
"brightwhite": "#FFFFFF",
|
||||
}
|
||||
|
||||
control_keys_mapping = {
|
||||
QtCore.Qt.Key_A: b"\x01", # Ctrl-A
|
||||
QtCore.Qt.Key_B: b"\x02", # Ctrl-B
|
||||
QtCore.Qt.Key_C: b"\x03", # Ctrl-C
|
||||
QtCore.Qt.Key_D: b"\x04", # Ctrl-D
|
||||
QtCore.Qt.Key_E: b"\x05", # Ctrl-E
|
||||
QtCore.Qt.Key_F: b"\x06", # Ctrl-F
|
||||
QtCore.Qt.Key_G: b"\x07", # Ctrl-G (Bell)
|
||||
QtCore.Qt.Key_H: b"\x08", # Ctrl-H (Backspace)
|
||||
QtCore.Qt.Key_I: b"\x09", # Ctrl-I (Tab)
|
||||
QtCore.Qt.Key_J: b"\x0a", # Ctrl-J (Line Feed)
|
||||
QtCore.Qt.Key_K: b"\x0b", # Ctrl-K (Vertical Tab)
|
||||
QtCore.Qt.Key_L: b"\x0c", # Ctrl-L (Form Feed)
|
||||
QtCore.Qt.Key_M: b"\x0d", # Ctrl-M (Carriage Return)
|
||||
QtCore.Qt.Key_N: b"\x0e", # Ctrl-N
|
||||
QtCore.Qt.Key_O: b"\x0f", # Ctrl-O
|
||||
QtCore.Qt.Key_P: b"\x10", # Ctrl-P
|
||||
QtCore.Qt.Key_Q: b"\x11", # Ctrl-Q
|
||||
QtCore.Qt.Key_R: b"\x12", # Ctrl-R
|
||||
QtCore.Qt.Key_S: b"\x13", # Ctrl-S
|
||||
QtCore.Qt.Key_T: b"\x14", # Ctrl-T
|
||||
QtCore.Qt.Key_U: b"\x15", # Ctrl-U
|
||||
QtCore.Qt.Key_V: b"\x16", # Ctrl-V
|
||||
QtCore.Qt.Key_W: b"\x17", # Ctrl-W
|
||||
QtCore.Qt.Key_X: b"\x18", # Ctrl-X
|
||||
QtCore.Qt.Key_Y: b"\x19", # Ctrl-Y
|
||||
QtCore.Qt.Key_Z: b"\x1a", # Ctrl-Z
|
||||
QtCore.Qt.Key_Escape: b"\x1b", # Ctrl-Escape
|
||||
QtCore.Qt.Key_Backslash: b"\x1c", # Ctrl-\
|
||||
QtCore.Qt.Key_Underscore: b"\x1f", # Ctrl-_
|
||||
}
|
||||
|
||||
normal_keys_mapping = {
|
||||
QtCore.Qt.Key_Return: b"\n",
|
||||
QtCore.Qt.Key_Space: b" ",
|
||||
QtCore.Qt.Key_Enter: b"\n",
|
||||
QtCore.Qt.Key_Tab: b"\t",
|
||||
QtCore.Qt.Key_Backspace: b"\x08",
|
||||
QtCore.Qt.Key_Home: b"\x47",
|
||||
QtCore.Qt.Key_End: b"\x4f",
|
||||
QtCore.Qt.Key_Left: b"\x02",
|
||||
QtCore.Qt.Key_Up: b"\x10",
|
||||
QtCore.Qt.Key_Right: b"\x06",
|
||||
QtCore.Qt.Key_Down: b"\x0e",
|
||||
QtCore.Qt.Key_PageUp: b"\x49",
|
||||
QtCore.Qt.Key_PageDown: b"\x51",
|
||||
QtCore.Qt.Key_F1: b"\x1b\x31",
|
||||
QtCore.Qt.Key_F2: b"\x1b\x32",
|
||||
QtCore.Qt.Key_F3: b"\x1b\x33",
|
||||
QtCore.Qt.Key_F4: b"\x1b\x34",
|
||||
QtCore.Qt.Key_F5: b"\x1b\x35",
|
||||
QtCore.Qt.Key_F6: b"\x1b\x36",
|
||||
QtCore.Qt.Key_F7: b"\x1b\x37",
|
||||
QtCore.Qt.Key_F8: b"\x1b\x38",
|
||||
QtCore.Qt.Key_F9: b"\x1b\x39",
|
||||
QtCore.Qt.Key_F10: b"\x1b\x30",
|
||||
QtCore.Qt.Key_F11: b"\x45",
|
||||
QtCore.Qt.Key_F12: b"\x46",
|
||||
}
|
||||
|
||||
|
||||
def QtKeyToAscii(event):
|
||||
"""
|
||||
Convert the Qt key event to the corresponding ASCII sequence for
|
||||
the terminal. This works fine for standard alphanumerical characters, but
|
||||
most other characters require terminal specific control sequences.
|
||||
|
||||
The conversion below works for TERM="linux" terminals.
|
||||
"""
|
||||
if sys.platform == "darwin":
|
||||
# special case for MacOS
|
||||
# /!\ Qt maps ControlModifier to CMD
|
||||
# CMD-C, CMD-V for copy/paste
|
||||
# CTRL-C and other modifiers -> key mapping
|
||||
if event.modifiers() == QtCore.Qt.MetaModifier:
|
||||
if event.key() == Qt.Key_Backspace:
|
||||
return control_keys_mapping.get(Qt.Key_W)
|
||||
return control_keys_mapping.get(event.key())
|
||||
elif event.modifiers() == QtCore.Qt.ControlModifier:
|
||||
if event.key() == Qt.Key_C:
|
||||
# copy
|
||||
return "copy"
|
||||
elif event.key() == Qt.Key_V:
|
||||
# paste
|
||||
return "paste"
|
||||
return None
|
||||
else:
|
||||
return normal_keys_mapping.get(event.key(), event.text().encode("utf8"))
|
||||
if event.modifiers() == QtCore.Qt.ControlModifier:
|
||||
return control_keys_mapping.get(event.key())
|
||||
else:
|
||||
return normal_keys_mapping.get(event.key(), event.text().encode("utf8"))
|
||||
|
||||
|
||||
class Screen(pyte.HistoryScreen):
|
||||
def __init__(self, stdin_fd, cols, rows, historyLength):
|
||||
super().__init__(cols, rows, historyLength, ratio=1 / rows)
|
||||
self._fd = stdin_fd
|
||||
|
||||
def write_process_input(self, data):
|
||||
"""Response to CPR request (for example),
|
||||
this can be for other requests
|
||||
"""
|
||||
try:
|
||||
os.write(self._fd, data.encode("utf-8"))
|
||||
except (IOError, OSError):
|
||||
pass
|
||||
|
||||
def resize(self, lines, columns):
|
||||
lines = lines or self.lines
|
||||
columns = columns or self.columns
|
||||
|
||||
if lines == self.lines and columns == self.columns:
|
||||
return # No changes.
|
||||
|
||||
self.dirty.clear()
|
||||
self.dirty.update(range(lines))
|
||||
|
||||
self.save_cursor()
|
||||
if lines < self.lines:
|
||||
if lines <= self.cursor.y:
|
||||
nlines_to_move_up = self.lines - lines
|
||||
for i in range(nlines_to_move_up):
|
||||
line = self.buffer[i] # .pop(0)
|
||||
self.history.top.append(line)
|
||||
self.cursor_position(0, 0)
|
||||
self.delete_lines(nlines_to_move_up)
|
||||
self.restore_cursor()
|
||||
self.cursor.y -= nlines_to_move_up
|
||||
else:
|
||||
self.restore_cursor()
|
||||
|
||||
self.lines, self.columns = lines, columns
|
||||
self.history = History(
|
||||
self.history.top,
|
||||
self.history.bottom,
|
||||
1 / self.lines,
|
||||
self.history.size,
|
||||
self.history.position,
|
||||
)
|
||||
self.set_margins()
|
||||
|
||||
|
||||
class Backend(QtCore.QObject):
|
||||
"""
|
||||
Poll Bash.
|
||||
|
||||
This class will run as a qsocketnotifier (started in ``_TerminalWidget``) and poll the
|
||||
file descriptor of the Bash terminal.
|
||||
"""
|
||||
|
||||
# Signals to communicate with ``_TerminalWidget``.
|
||||
dataReady = pyqtSignal(object)
|
||||
processExited = pyqtSignal()
|
||||
|
||||
def __init__(self, fd, cols, rows):
|
||||
super().__init__()
|
||||
|
||||
# File descriptor that connects to Bash process.
|
||||
self.fd = fd
|
||||
|
||||
# Setup Pyte (hard coded display size for now).
|
||||
self.screen = Screen(self.fd, cols, rows, 10000)
|
||||
self.stream = pyte.ByteStream()
|
||||
self.stream.attach(self.screen)
|
||||
|
||||
self.notifier = QSocketNotifier(fd, QSocketNotifier.Read)
|
||||
self.notifier.activated.connect(self._fd_readable)
|
||||
|
||||
def _fd_readable(self):
|
||||
"""
|
||||
Poll the Bash output, run it through Pyte, and notify
|
||||
"""
|
||||
# Read the shell output until the file descriptor is closed.
|
||||
try:
|
||||
out = os.read(self.fd, 2**16)
|
||||
except OSError:
|
||||
self.processExited.emit()
|
||||
self.notifier.setEnabled(False)
|
||||
return
|
||||
|
||||
# Feed output into Pyte's state machine and send the new screen
|
||||
# output to the GUI
|
||||
self.stream.feed(out)
|
||||
self.dataReady.emit(self.screen)
|
||||
|
||||
|
||||
class BECConsole(QtWidgets.QWidget):
|
||||
"""Container widget for the terminal text area"""
|
||||
|
||||
PLUGIN = True
|
||||
ICON_NAME = "terminal"
|
||||
|
||||
prompt = pyqtSignal(bool)
|
||||
|
||||
def __init__(self, parent=None, cols=132):
|
||||
super().__init__(parent)
|
||||
|
||||
self.term = _TerminalWidget(self, cols, rows=43)
|
||||
self.term.prompt.connect(self.prompt) # forward signal from term to this widget
|
||||
|
||||
self.scroll_bar = QScrollBar(Qt.Vertical, self)
|
||||
# self.scroll_bar.hide()
|
||||
layout = QHBoxLayout(self)
|
||||
layout.addWidget(self.term)
|
||||
layout.addWidget(self.scroll_bar)
|
||||
layout.setAlignment(Qt.AlignLeft | Qt.AlignTop)
|
||||
layout.setContentsMargins(0, 0, 0, 0)
|
||||
self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Expanding)
|
||||
|
||||
pal = QPalette()
|
||||
self.set_bgcolor(pal.window().color())
|
||||
self.set_fgcolor(pal.windowText().color())
|
||||
self.term.set_scroll_bar(self.scroll_bar)
|
||||
self.set_cmd("bec --nogui")
|
||||
|
||||
self._check_designer_timer = QTimer()
|
||||
self._check_designer_timer.timeout.connect(self.check_designer)
|
||||
self._check_designer_timer.start(1000)
|
||||
|
||||
def minimumSizeHint(self):
|
||||
size = self.term.sizeHint()
|
||||
size.setWidth(size.width() + self.scroll_bar.width())
|
||||
return size
|
||||
|
||||
def sizeHint(self):
|
||||
return self.minimumSizeHint()
|
||||
|
||||
def check_designer(self, calls={"n": 0}):
|
||||
calls["n"] += 1
|
||||
if self.term.fd is not None:
|
||||
# already started
|
||||
self._check_designer_timer.stop()
|
||||
elif self.window().windowTitle().endswith("[Preview]"):
|
||||
# assuming Designer preview -> start
|
||||
self._check_designer_timer.stop()
|
||||
self.term.start()
|
||||
elif calls["n"] >= 3:
|
||||
# assuming not in Designer -> stop checking
|
||||
self._check_designer_timer.stop()
|
||||
|
||||
def get_rows(self):
|
||||
return self.term.rows
|
||||
|
||||
def set_rows(self, rows):
|
||||
self.term.rows = rows
|
||||
self.adjustSize()
|
||||
self.updateGeometry()
|
||||
|
||||
def get_cols(self):
|
||||
return self.term.cols
|
||||
|
||||
def set_cols(self, cols):
|
||||
self.term.cols = cols
|
||||
self.adjustSize()
|
||||
self.updateGeometry()
|
||||
|
||||
def get_bgcolor(self):
|
||||
return QColor.fromString(self.term.bg_color)
|
||||
|
||||
def set_bgcolor(self, color):
|
||||
self.term.bg_color = color.name(QColor.HexRgb)
|
||||
|
||||
def get_fgcolor(self):
|
||||
return QColor.fromString(self.term.fg_color)
|
||||
|
||||
def set_fgcolor(self, color):
|
||||
self.term.fg_color = color.name(QColor.HexRgb)
|
||||
|
||||
def get_cmd(self):
|
||||
return self.term._cmd
|
||||
|
||||
def set_cmd(self, cmd):
|
||||
self.term._cmd = cmd
|
||||
if self.term.fd is None:
|
||||
# not started yet
|
||||
self.term.clear()
|
||||
self.term.appendHtml(f"<h2>BEC Console - {repr(cmd)}</h2>")
|
||||
|
||||
def start(self, deactivate_ctrl_d=True):
|
||||
self.term.start(deactivate_ctrl_d=deactivate_ctrl_d)
|
||||
|
||||
def push(self, text, hit_return=False):
|
||||
"""Push some text to the terminal"""
|
||||
return self.term.push(text, hit_return=hit_return)
|
||||
|
||||
def execute_command(self, command):
|
||||
self.push(command, hit_return=True)
|
||||
|
||||
def set_prompt_tokens(self, *tokens):
|
||||
"""Prepare regexp to identify prompt, based on tokens
|
||||
|
||||
Tokens are returned from get_ipython().prompts.in_prompt_tokens()
|
||||
"""
|
||||
regex_parts = []
|
||||
for token_type, token_value in tokens:
|
||||
if token_type == Token.PromptNum: # Handle dynamic prompt number
|
||||
regex_parts.append(r"[\d\?]+") # Match one or more digits or '?'
|
||||
else:
|
||||
# Escape other prompt parts (e.g., "In [", "]: ")
|
||||
if not token_value:
|
||||
regex_parts.append(".+?") # arbitrary string
|
||||
else:
|
||||
regex_parts.append(re.escape(token_value))
|
||||
|
||||
# Combine into a single regex
|
||||
prompt_pattern = "".join(regex_parts)
|
||||
self.term._prompt_re = re.compile(prompt_pattern + r"\s*$")
|
||||
|
||||
def terminate(self, timeout=10):
|
||||
self.term.stop(timeout=timeout)
|
||||
|
||||
def send_ctrl_c(self, timeout=None):
|
||||
self.term.send_ctrl_c(timeout)
|
||||
|
||||
cols = pyqtProperty(int, get_cols, set_cols)
|
||||
rows = pyqtProperty(int, get_rows, set_rows)
|
||||
bgcolor = pyqtProperty(QColor, get_bgcolor, set_bgcolor)
|
||||
fgcolor = pyqtProperty(QColor, get_fgcolor, set_fgcolor)
|
||||
cmd = pyqtProperty(str, get_cmd, set_cmd)
|
||||
|
||||
|
||||
class _TerminalWidget(QtWidgets.QPlainTextEdit):
|
||||
"""
|
||||
Start ``Backend`` process and render Pyte output as text.
|
||||
"""
|
||||
|
||||
prompt = pyqtSignal(bool)
|
||||
|
||||
def __init__(self, parent, cols=125, rows=50, **kwargs):
|
||||
# regexp to match prompt
|
||||
self._prompt_re = None
|
||||
# last prompt
|
||||
self._prompt_str = None
|
||||
# process pid
|
||||
self.pid = None
|
||||
# file descriptor to communicate with the subprocess
|
||||
self.fd = None
|
||||
self.backend = None
|
||||
# command to execute
|
||||
self._cmd = ""
|
||||
# should ctrl-d be deactivated ? (prevent Python exit)
|
||||
self._deactivate_ctrl_d = False
|
||||
|
||||
# Default colors
|
||||
pal = QPalette()
|
||||
self._fg_color = pal.text().color().name()
|
||||
self._bg_color = pal.base().color().name()
|
||||
|
||||
# Specify the terminal size in terms of lines and columns.
|
||||
self._rows = rows
|
||||
self._cols = cols
|
||||
self.output = collections.deque()
|
||||
|
||||
super().__init__(parent)
|
||||
|
||||
self.setSizePolicy(QSizePolicy.MinimumExpanding, QSizePolicy.Expanding)
|
||||
|
||||
# Disable default scrollbars (we use our own, to be set via .set_scroll_bar())
|
||||
self.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
|
||||
self.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
|
||||
self.scroll_bar = None
|
||||
|
||||
# Use Monospace fonts and disable line wrapping.
|
||||
self.setFont(QtGui.QFont("Courier", 9))
|
||||
self.setFont(QtGui.QFont("Monospace"))
|
||||
self.setLineWrapMode(QtWidgets.QPlainTextEdit.NoWrap)
|
||||
fmt = QtGui.QFontMetrics(self.font())
|
||||
char_width = fmt.width("w")
|
||||
self.setCursorWidth(char_width)
|
||||
|
||||
self.adjustSize()
|
||||
self.updateGeometry()
|
||||
self.update_stylesheet()
|
||||
|
||||
@property
|
||||
def bg_color(self):
|
||||
return self._bg_color
|
||||
|
||||
@bg_color.setter
|
||||
def bg_color(self, hexcolor):
|
||||
self._bg_color = hexcolor
|
||||
self.update_stylesheet()
|
||||
|
||||
@property
|
||||
def fg_color(self):
|
||||
return self._fg_color
|
||||
|
||||
@fg_color.setter
|
||||
def fg_color(self, hexcolor):
|
||||
self._fg_color = hexcolor
|
||||
self.update_stylesheet()
|
||||
|
||||
def update_stylesheet(self):
|
||||
self.setStyleSheet(
|
||||
f"QPlainTextEdit {{ border: 0; color: {self._fg_color}; background-color: {self._bg_color}; }} "
|
||||
)
|
||||
|
||||
@property
|
||||
def rows(self):
|
||||
return self._rows
|
||||
|
||||
@rows.setter
|
||||
def rows(self, rows: int):
|
||||
if self.backend is None:
|
||||
# not initialized yet, ok to change
|
||||
self._rows = rows
|
||||
self.adjustSize()
|
||||
self.updateGeometry()
|
||||
else:
|
||||
raise RuntimeError("Cannot change rows after console is started.")
|
||||
|
||||
@property
|
||||
def cols(self):
|
||||
return self._cols
|
||||
|
||||
@cols.setter
|
||||
def cols(self, cols: int):
|
||||
if self.fd is None:
|
||||
# not initialized yet, ok to change
|
||||
self._cols = cols
|
||||
self.adjustSize()
|
||||
self.updateGeometry()
|
||||
else:
|
||||
raise RuntimeError("Cannot change cols after console is started.")
|
||||
|
||||
def start(self, deactivate_ctrl_d: bool = False):
|
||||
self._deactivate_ctrl_d = deactivate_ctrl_d
|
||||
|
||||
self.update_term_size()
|
||||
|
||||
# Start the Bash process
|
||||
self.pid, self.fd = self.fork_shell()
|
||||
|
||||
if self.fd:
|
||||
# Create the ``Backend`` object
|
||||
self.backend = Backend(self.fd, self.cols, self.rows)
|
||||
self.backend.dataReady.connect(self.data_ready)
|
||||
self.backend.processExited.connect(self.process_exited)
|
||||
else:
|
||||
self.process_exited()
|
||||
|
||||
def process_exited(self):
|
||||
self.fd = None
|
||||
self.clear()
|
||||
self.appendHtml(f"<br><h2>{repr(self._cmd)} - Process exited.</h2>")
|
||||
self.setReadOnly(True)
|
||||
|
||||
def send_ctrl_c(self, wait_prompt=True, timeout=None):
|
||||
"""Send CTRL-C to the process
|
||||
|
||||
If wait_prompt=True (default), wait for a new prompt after CTRL-C
|
||||
If no prompt is displayed after 'timeout' seconds, TimeoutError is raised
|
||||
"""
|
||||
os.kill(self.pid, signal.SIGINT)
|
||||
if wait_prompt:
|
||||
timeout_error = False
|
||||
if timeout:
|
||||
|
||||
def set_timeout_error():
|
||||
nonlocal timeout_error
|
||||
timeout_error = True
|
||||
|
||||
timeout_timer = QTimer()
|
||||
timeout_timer.singleShot(timeout * 1000, set_timeout_error)
|
||||
while self._prompt_str is None:
|
||||
QApplication.instance().process_events()
|
||||
if timeout_error:
|
||||
raise TimeoutError(
|
||||
f"CTRL-C: could not get back to prompt after {timeout} seconds."
|
||||
)
|
||||
|
||||
def _is_running(self):
|
||||
if os.waitpid(self.pid, os.WNOHANG) == (0, 0):
|
||||
return True
|
||||
return False
|
||||
|
||||
def stop(self, kill=True, timeout=None):
|
||||
"""Stop the running process
|
||||
|
||||
SIGTERM is the default signal for terminating processes.
|
||||
|
||||
If kill=True (default), SIGKILL will be sent if the process does not exit after timeout
|
||||
"""
|
||||
# try to exit gracefully
|
||||
os.kill(self.pid, signal.SIGTERM)
|
||||
|
||||
# wait until process is truly dead
|
||||
t0 = time.perf_counter()
|
||||
while self._is_running():
|
||||
time.sleep(1)
|
||||
if timeout is not None and time.perf_counter() - t0 > timeout:
|
||||
# still alive after 'timeout' seconds
|
||||
if kill:
|
||||
# send SIGKILL and make a last check in loop
|
||||
os.kill(self.pid, signal.SIGKILL)
|
||||
kill = False
|
||||
else:
|
||||
# still running after timeout...
|
||||
raise TimeoutError(
|
||||
f"Could not terminate process with pid: {self.pid} within timeout"
|
||||
)
|
||||
self.process_exited()
|
||||
|
||||
def data_ready(self, screen):
|
||||
"""Handle new screen: redraw, set scroll bar max and slider, move cursor to its position
|
||||
|
||||
This method is triggered via a signal from ``Backend``.
|
||||
"""
|
||||
self.redraw_screen()
|
||||
self.adjust_scroll_bar()
|
||||
self.move_cursor()
|
||||
|
||||
def minimumSizeHint(self):
|
||||
"""Return minimum size for current cols and rows"""
|
||||
fmt = QtGui.QFontMetrics(self.font())
|
||||
char_width = fmt.width("w")
|
||||
char_height = fmt.height()
|
||||
width = char_width * self.cols
|
||||
height = char_height * self.rows
|
||||
return QSize(width, height)
|
||||
|
||||
def sizeHint(self):
|
||||
return self.minimumSizeHint()
|
||||
|
||||
def set_scroll_bar(self, scroll_bar):
|
||||
self.scroll_bar = scroll_bar
|
||||
self.scroll_bar.setMinimum(0)
|
||||
self.scroll_bar.valueChanged.connect(self.scroll_value_change)
|
||||
|
||||
def scroll_value_change(self, value, old={"value": -1}):
|
||||
if self.backend is None:
|
||||
return
|
||||
if old["value"] == -1:
|
||||
old["value"] = self.scroll_bar.maximum()
|
||||
if value <= old["value"]:
|
||||
# scroll up
|
||||
# value is number of lines from the start
|
||||
nlines = old["value"] - value
|
||||
# history ratio gives prev_page == 1 line
|
||||
for i in range(nlines):
|
||||
self.backend.screen.prev_page()
|
||||
else:
|
||||
# scroll down
|
||||
nlines = value - old["value"]
|
||||
for i in range(nlines):
|
||||
self.backend.screen.next_page()
|
||||
old["value"] = value
|
||||
self.redraw_screen()
|
||||
|
||||
def adjust_scroll_bar(self):
|
||||
sb = self.scroll_bar
|
||||
sb.valueChanged.disconnect(self.scroll_value_change)
|
||||
tmp = len(self.backend.screen.history.top) + len(self.backend.screen.history.bottom)
|
||||
sb.setMaximum(tmp if tmp > 0 else 0)
|
||||
sb.setSliderPosition(tmp if tmp > 0 else 0)
|
||||
# if tmp > 0:
|
||||
# # show scrollbar, but delayed - prevent recursion with widget size change
|
||||
# QTimer.singleShot(0, scrollbar.show)
|
||||
# else:
|
||||
# QTimer.singleShot(0, scrollbar.hide)
|
||||
sb.valueChanged.connect(self.scroll_value_change)
|
||||
|
||||
def write(self, data):
|
||||
try:
|
||||
os.write(self.fd, data)
|
||||
except (IOError, OSError):
|
||||
self.process_exited()
|
||||
|
||||
@Slot(object)
|
||||
def keyPressEvent(self, event):
|
||||
"""
|
||||
Redirect all keystrokes to the terminal process.
|
||||
"""
|
||||
if self.fd is None:
|
||||
# not started
|
||||
return
|
||||
# Convert the Qt key to the correct ASCII code.
|
||||
if (
|
||||
self._deactivate_ctrl_d
|
||||
and event.modifiers() == QtCore.Qt.ControlModifier
|
||||
and event.key() == QtCore.Qt.Key_D
|
||||
):
|
||||
return None
|
||||
|
||||
code = QtKeyToAscii(event)
|
||||
if code == "copy":
|
||||
# MacOS only: CMD-C handling
|
||||
self.copy()
|
||||
elif code == "paste":
|
||||
# MacOS only: CMD-V handling
|
||||
self._push_clipboard()
|
||||
elif code is not None:
|
||||
self.write(code)
|
||||
|
||||
def push(self, text, hit_return=False):
|
||||
"""
|
||||
Write 'text' to terminal
|
||||
"""
|
||||
self.write(text.encode("utf-8"))
|
||||
if hit_return:
|
||||
self.write(b"\n")
|
||||
|
||||
def contextMenuEvent(self, event):
|
||||
if self.fd is None:
|
||||
return
|
||||
menu = self.createStandardContextMenu()
|
||||
for action in menu.actions():
|
||||
# remove all actions except copy and paste
|
||||
if "opy" in action.text():
|
||||
# redefine text without shortcut
|
||||
# since it probably clashes with control codes (like CTRL-C etc)
|
||||
action.setText("Copy")
|
||||
continue
|
||||
if "aste" in action.text():
|
||||
# redefine text without shortcut
|
||||
action.setText("Paste")
|
||||
# paste -> have to insert with self.push
|
||||
action.triggered.connect(self._push_clipboard)
|
||||
continue
|
||||
menu.removeAction(action)
|
||||
menu.exec_(event.globalPos())
|
||||
|
||||
def _push_clipboard(self):
|
||||
clipboard = QApplication.instance().clipboard()
|
||||
self.push(clipboard.text())
|
||||
|
||||
def move_cursor(self):
|
||||
textCursor = self.textCursor()
|
||||
textCursor.setPosition(0)
|
||||
textCursor.movePosition(
|
||||
QTextCursor.Down, QTextCursor.MoveAnchor, self.backend.screen.cursor.y
|
||||
)
|
||||
textCursor.movePosition(
|
||||
QTextCursor.Right, QTextCursor.MoveAnchor, self.backend.screen.cursor.x
|
||||
)
|
||||
self.setTextCursor(textCursor)
|
||||
|
||||
def mouseReleaseEvent(self, event):
|
||||
if self.fd is None:
|
||||
return
|
||||
if event.button() == Qt.MiddleButton:
|
||||
# push primary selection buffer ("mouse clipboard") to terminal
|
||||
clipboard = QApplication.instance().clipboard()
|
||||
if clipboard.supportsSelection():
|
||||
self.push(clipboard.text(QClipboard.Selection))
|
||||
return None
|
||||
elif event.button() == Qt.LeftButton:
|
||||
# left button click
|
||||
textCursor = self.textCursor()
|
||||
if textCursor.selectedText():
|
||||
# mouse was used to select text -> nothing to do
|
||||
pass
|
||||
else:
|
||||
# a simple 'click', move scrollbar to end
|
||||
self.scroll_bar.setSliderPosition(self.scroll_bar.maximum())
|
||||
self.move_cursor()
|
||||
return None
|
||||
return super().mouseReleaseEvent(event)
|
||||
|
||||
def redraw_screen(self):
|
||||
"""
|
||||
Render the screen as formatted text into the widget.
|
||||
"""
|
||||
screen = self.backend.screen
|
||||
|
||||
# Clear the widget
|
||||
if screen.dirty:
|
||||
self.clear()
|
||||
while len(self.output) < (max(screen.dirty) + 1):
|
||||
self.output.append("")
|
||||
while len(self.output) > (max(screen.dirty) + 1):
|
||||
self.output.pop()
|
||||
|
||||
# Prepare the HTML output
|
||||
for line_no in screen.dirty:
|
||||
line = text = ""
|
||||
style = old_style = ""
|
||||
old_idx = 0
|
||||
for idx, ch in screen.buffer[line_no].items():
|
||||
text += " " * (idx - old_idx - 1)
|
||||
old_idx = idx
|
||||
style = f"{'background-color:%s;' % ansi_colors.get(ch.bg, ansi_colors['black']) if ch.bg!='default' else ''}{'color:%s;' % ansi_colors.get(ch.fg, ansi_colors['white']) if ch.fg!='default' else ''}{'font-weight:bold;' if ch.bold else ''}{'font-style:italic;' if ch.italics else ''}"
|
||||
if style != old_style:
|
||||
if old_style:
|
||||
line += f"<span style={repr(old_style)}>{html.escape(text, quote=True)}</span>"
|
||||
else:
|
||||
line += html.escape(text, quote=True)
|
||||
text = ""
|
||||
old_style = style
|
||||
text += ch.data
|
||||
if style:
|
||||
line += f"<span style={repr(style)}>{html.escape(text, quote=True)}</span>"
|
||||
else:
|
||||
line += html.escape(text, quote=True)
|
||||
# do a check at the cursor position:
|
||||
# it is possible x pos > output line length,
|
||||
# for example if last escape codes are "cursor forward" past end of text,
|
||||
# like IPython does for "..." prompt (in a block, like "for" loop or "while" for example)
|
||||
# In this case, cursor is at 12 but last text output is at 8 -> insert spaces
|
||||
if line_no == screen.cursor.y:
|
||||
llen = len(screen.buffer[line_no])
|
||||
if llen < screen.cursor.x:
|
||||
line += " " * (screen.cursor.x - llen)
|
||||
self.output[line_no] = line
|
||||
# fill the text area with HTML contents in one go
|
||||
self.appendHtml(f"<pre>{chr(10).join(self.output)}</pre>")
|
||||
|
||||
if self._prompt_re is not None:
|
||||
text_buf = self.toPlainText()
|
||||
prompt = self._prompt_re.search(text_buf)
|
||||
if prompt is None:
|
||||
if self._prompt_str:
|
||||
self.prompt.emit(False)
|
||||
self._prompt_str = None
|
||||
else:
|
||||
prompt_str = prompt.string.rstrip()
|
||||
if prompt_str != self._prompt_str:
|
||||
self._prompt_str = prompt_str
|
||||
self.prompt.emit(True)
|
||||
|
||||
# did updates, all clean
|
||||
screen.dirty.clear()
|
||||
|
||||
def update_term_size(self):
|
||||
fmt = QtGui.QFontMetrics(self.font())
|
||||
char_width = fmt.width("w")
|
||||
char_height = fmt.height()
|
||||
self._cols = int(self.width() / char_width)
|
||||
self._rows = int(self.height() / char_height)
|
||||
|
||||
def resizeEvent(self, event):
|
||||
self.update_term_size()
|
||||
if self.fd:
|
||||
self.backend.screen.resize(self._rows, self._cols)
|
||||
self.redraw_screen()
|
||||
self.adjust_scroll_bar()
|
||||
self.move_cursor()
|
||||
|
||||
def wheelEvent(self, event):
|
||||
if not self.fd:
|
||||
return
|
||||
y = event.angleDelta().y()
|
||||
if y > 0:
|
||||
self.backend.screen.prev_page()
|
||||
else:
|
||||
self.backend.screen.next_page()
|
||||
self.redraw_screen()
|
||||
|
||||
def fork_shell(self):
|
||||
"""
|
||||
Fork the current process and execute bec in shell.
|
||||
"""
|
||||
try:
|
||||
pid, fd = pty.fork()
|
||||
except (IOError, OSError):
|
||||
return False
|
||||
if pid == 0:
|
||||
try:
|
||||
ls = os.environ["LANG"].split(".")
|
||||
except KeyError:
|
||||
ls = []
|
||||
if len(ls) < 2:
|
||||
ls = ["en_US", "UTF-8"]
|
||||
os.putenv("COLUMNS", str(self.cols))
|
||||
os.putenv("LINES", str(self.rows))
|
||||
os.putenv("TERM", "linux")
|
||||
os.putenv("LANG", ls[0] + ".UTF-8")
|
||||
if not self._cmd:
|
||||
self._cmd = os.environ["SHELL"]
|
||||
cmd = self._cmd
|
||||
if isinstance(cmd, str):
|
||||
cmd = cmd.split()
|
||||
try:
|
||||
os.execvp(cmd[0], cmd)
|
||||
except (IOError, OSError):
|
||||
pass
|
||||
os._exit(0)
|
||||
else:
|
||||
# We are in the parent process.
|
||||
# Set file control
|
||||
fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)
|
||||
return pid, fd
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import os
|
||||
import sys
|
||||
|
||||
from qtpy import QtGui, QtWidgets
|
||||
|
||||
# Create the Qt application and console.
|
||||
app = QtWidgets.QApplication([])
|
||||
mainwin = QtWidgets.QMainWindow()
|
||||
title = "BECConsole"
|
||||
mainwin.setWindowTitle(title)
|
||||
|
||||
console = BECConsole(mainwin)
|
||||
mainwin.setCentralWidget(console)
|
||||
|
||||
def check_prompt(at_prompt):
|
||||
if at_prompt:
|
||||
print("NEW PROMPT")
|
||||
else:
|
||||
print("EXECUTING SOMETHING...")
|
||||
|
||||
console.set_prompt_tokens(
|
||||
(Token.OutPromptNum, "•"),
|
||||
(Token.Prompt, ""), # will match arbitrary string,
|
||||
(Token.Prompt, " ["),
|
||||
(Token.PromptNum, "3"),
|
||||
(Token.Prompt, "/"),
|
||||
(Token.PromptNum, "1"),
|
||||
(Token.Prompt, "] "),
|
||||
(Token.Prompt, "❯❯"),
|
||||
)
|
||||
console.prompt.connect(check_prompt)
|
||||
console.start()
|
||||
|
||||
# Show widget and launch Qt's event loop.
|
||||
mainwin.show()
|
||||
sys.exit(app.exec_())
|
||||
@@ -1 +0,0 @@
|
||||
{'files': ['console.py']}
|
||||
@@ -1,58 +0,0 @@
|
||||
# Copyright (C) 2022 The Qt Company Ltd.
|
||||
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
|
||||
import os
|
||||
|
||||
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
|
||||
|
||||
import bec_widgets
|
||||
from bec_widgets.utils.bec_designer import designer_material_icon
|
||||
from bec_widgets.widgets.editors.console.console import BECConsole
|
||||
|
||||
DOM_XML = """
|
||||
<ui language='c++'>
|
||||
<widget class='BECConsole' name='bec_console'>
|
||||
</widget>
|
||||
</ui>
|
||||
"""
|
||||
|
||||
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
|
||||
|
||||
|
||||
class BECConsolePlugin(QDesignerCustomWidgetInterface): # pragma: no cover
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._form_editor = None
|
||||
|
||||
def createWidget(self, parent):
|
||||
t = BECConsole(parent)
|
||||
return t
|
||||
|
||||
def domXml(self):
|
||||
return DOM_XML
|
||||
|
||||
def group(self):
|
||||
return "BEC Console"
|
||||
|
||||
def icon(self):
|
||||
return designer_material_icon(BECConsole.ICON_NAME)
|
||||
|
||||
def includeFile(self):
|
||||
return "bec_console"
|
||||
|
||||
def initialize(self, form_editor):
|
||||
self._form_editor = form_editor
|
||||
|
||||
def isContainer(self):
|
||||
return False
|
||||
|
||||
def isInitialized(self):
|
||||
return self._form_editor is not None
|
||||
|
||||
def name(self):
|
||||
return "BECConsole"
|
||||
|
||||
def toolTip(self):
|
||||
return "A terminal-like vt100 widget."
|
||||
|
||||
def whatsThis(self):
|
||||
return self.toolTip()
|
||||
@@ -1,15 +0,0 @@
|
||||
def main(): # pragma: no cover
|
||||
from qtpy import PYSIDE6
|
||||
|
||||
if not PYSIDE6:
|
||||
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
|
||||
return
|
||||
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
|
||||
|
||||
from bec_widgets.widgets.editors.console.console_plugin import BECConsolePlugin
|
||||
|
||||
QPyDesignerCustomWidgetCollection.addCustomWidget(BECConsolePlugin())
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
main()
|
||||
@@ -9,8 +9,19 @@ import pyqtgraph as pg
|
||||
from bec_lib import bec_logger, messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from pydantic import Field, ValidationError, field_validator
|
||||
from qtpy.QtCore import QTimer, Signal
|
||||
from qtpy.QtWidgets import QApplication, QDialog, QHBoxLayout, QMainWindow, QVBoxLayout, QWidget
|
||||
from qtpy.QtCore import Qt, QTimer, Signal
|
||||
from qtpy.QtWidgets import (
|
||||
QApplication,
|
||||
QCheckBox,
|
||||
QDialog,
|
||||
QDialogButtonBox,
|
||||
QDoubleSpinBox,
|
||||
QHBoxLayout,
|
||||
QLabel,
|
||||
QMainWindow,
|
||||
QVBoxLayout,
|
||||
QWidget,
|
||||
)
|
||||
|
||||
from bec_widgets.utils import ConnectionConfig
|
||||
from bec_widgets.utils.bec_signal_proxy import BECSignalProxy
|
||||
@@ -33,6 +44,11 @@ class WaveformConfig(ConnectionConfig):
|
||||
color_palette: str | None = Field(
|
||||
"plasma", description="The color palette of the figure widget.", validate_default=True
|
||||
)
|
||||
max_dataset_size_mb: float = Field(
|
||||
10,
|
||||
description="Maximum dataset size (in MB) permitted when fetching async data from history before prompting the user.",
|
||||
validate_default=True,
|
||||
)
|
||||
|
||||
model_config: dict = {"validate_assignment": True}
|
||||
_validate_color_palette = field_validator("color_palette")(Colors.validate_color_map)
|
||||
@@ -96,6 +112,12 @@ class Waveform(PlotBase):
|
||||
"x_entry.setter",
|
||||
"color_palette",
|
||||
"color_palette.setter",
|
||||
"skip_large_dataset_warning",
|
||||
"skip_large_dataset_warning.setter",
|
||||
"skip_large_dataset_check",
|
||||
"skip_large_dataset_check.setter",
|
||||
"max_dataset_size_mb",
|
||||
"max_dataset_size_mb.setter",
|
||||
"plot",
|
||||
"add_dap_curve",
|
||||
"remove_curve",
|
||||
@@ -144,6 +166,7 @@ class Waveform(PlotBase):
|
||||
self._mode: Literal["none", "sync", "async", "mixed"] = "none"
|
||||
|
||||
# Scan data
|
||||
self._scan_done = True # means scan is not running
|
||||
self.old_scan_id = None
|
||||
self.scan_id = None
|
||||
self.scan_item = None
|
||||
@@ -163,6 +186,10 @@ class Waveform(PlotBase):
|
||||
self._init_curve_dialog()
|
||||
self.curve_settings_dialog = None
|
||||
|
||||
# Large‑dataset guard
|
||||
self._skip_large_dataset_warning = False # session flag
|
||||
self._skip_large_dataset_check = False # per-plot flag, to skip the warning for this plot
|
||||
|
||||
# Scan status update loop
|
||||
self.bec_dispatcher.connect_slot(self.on_scan_status, MessageEndpoints.scan_status())
|
||||
self.bec_dispatcher.connect_slot(self.on_scan_progress, MessageEndpoints.scan_progress())
|
||||
@@ -561,6 +588,59 @@ class Waveform(PlotBase):
|
||||
"""
|
||||
return [item for item in self.plot_item.curves if isinstance(item, Curve)]
|
||||
|
||||
@SafeProperty(bool)
|
||||
def skip_large_dataset_check(self) -> bool:
|
||||
"""
|
||||
Whether to skip the large dataset warning when fetching async data.
|
||||
"""
|
||||
return self._skip_large_dataset_check
|
||||
|
||||
@skip_large_dataset_check.setter
|
||||
def skip_large_dataset_check(self, value: bool):
|
||||
"""
|
||||
Set whether to skip the large dataset warning when fetching async data.
|
||||
|
||||
Args:
|
||||
value(bool): Whether to skip the large dataset warning.
|
||||
"""
|
||||
self._skip_large_dataset_check = value
|
||||
|
||||
@SafeProperty(bool)
|
||||
def skip_large_dataset_warning(self) -> bool:
|
||||
"""
|
||||
Whether to skip the large dataset warning when fetching async data.
|
||||
"""
|
||||
return self._skip_large_dataset_warning
|
||||
|
||||
@skip_large_dataset_warning.setter
|
||||
def skip_large_dataset_warning(self, value: bool):
|
||||
"""
|
||||
Set whether to skip the large dataset warning when fetching async data.
|
||||
|
||||
Args:
|
||||
value(bool): Whether to skip the large dataset warning.
|
||||
"""
|
||||
self._skip_large_dataset_warning = value
|
||||
|
||||
@SafeProperty(float)
|
||||
def max_dataset_size_mb(self) -> float:
|
||||
"""
|
||||
The maximum dataset size (in MB) permitted when fetching async data from history before prompting the user.
|
||||
"""
|
||||
return self.config.max_dataset_size_mb
|
||||
|
||||
@max_dataset_size_mb.setter
|
||||
def max_dataset_size_mb(self, value: float):
|
||||
"""
|
||||
Set the maximum dataset size (in MB) permitted when fetching async data from history before prompting the user.
|
||||
|
||||
Args:
|
||||
value(float): The maximum dataset size in MB.
|
||||
"""
|
||||
if value <= 0:
|
||||
raise ValueError("Maximum dataset size must be greater than 0.")
|
||||
self.config.max_dataset_size_mb = value
|
||||
|
||||
################################################################################
|
||||
# High Level methods for API
|
||||
################################################################################
|
||||
@@ -807,8 +887,6 @@ class Waveform(PlotBase):
|
||||
if config.source == "device":
|
||||
if self.scan_item is None:
|
||||
self.update_with_scan_history(-1)
|
||||
if curve in self._async_curves:
|
||||
self._setup_async_curve(curve)
|
||||
self.async_signal_update.emit()
|
||||
self.sync_signal_update.emit()
|
||||
if config.source == "dap":
|
||||
@@ -1056,8 +1134,8 @@ class Waveform(PlotBase):
|
||||
meta(dict): The message metadata.
|
||||
"""
|
||||
self.sync_signal_update.emit()
|
||||
status = msg.get("done")
|
||||
if status:
|
||||
self._scan_done = msg.get("done")
|
||||
if self._scan_done:
|
||||
QTimer.singleShot(100, self.update_sync_curves)
|
||||
QTimer.singleShot(300, self.update_sync_curves)
|
||||
|
||||
@@ -1135,9 +1213,11 @@ class Waveform(PlotBase):
|
||||
if access_key == "val": # live access
|
||||
device_data = data.get(device_name, {}).get(device_entry, {}).get(access_key, None)
|
||||
else: # history access
|
||||
device_data = (
|
||||
data.get(device_name, {}).get(device_entry, {}).read().get("value", None)
|
||||
)
|
||||
dataset_obj = data.get(device_name, {})
|
||||
if self._skip_large_dataset_check is False:
|
||||
if not self._check_dataset_size_and_confirm(dataset_obj, device_entry):
|
||||
continue # user declined to load; skip this curve
|
||||
device_data = dataset_obj.get(device_entry, {}).read().get("value", None)
|
||||
|
||||
# if shape is 2D cast it into 1D and take the last waveform
|
||||
if len(np.shape(device_data)) > 1:
|
||||
@@ -1581,6 +1661,8 @@ class Waveform(PlotBase):
|
||||
dev_name = curve.config.signal.name
|
||||
if dev_name in readout_priority_async:
|
||||
self._async_curves.append(curve)
|
||||
if hasattr(self.scan_item, "live_data"):
|
||||
self._setup_async_curve(curve)
|
||||
found_async = True
|
||||
elif dev_name in readout_priority_sync:
|
||||
self._sync_curves.append(curve)
|
||||
@@ -1657,6 +1739,106 @@ class Waveform(PlotBase):
|
||||
################################################################################
|
||||
# Utility Methods
|
||||
################################################################################
|
||||
|
||||
# Large dataset handling helpers
|
||||
def _check_dataset_size_and_confirm(self, dataset_obj, device_entry: str) -> bool:
|
||||
"""
|
||||
Check the size of the dataset and confirm with the user if it exceeds the limit.
|
||||
|
||||
Args:
|
||||
dataset_obj: The dataset object containing the information.
|
||||
device_entry( str): The specific device entry to check.
|
||||
|
||||
Returns:
|
||||
bool: True if the dataset is within the size limit or user confirmed to load it,
|
||||
False if the dataset exceeds the size limit and user declined to load it.
|
||||
"""
|
||||
try:
|
||||
info = dataset_obj._info
|
||||
mem_bytes = info.get(device_entry, {}).get("value", {}).get("mem_size", 0)
|
||||
# Fallback – grab first entry if lookup failed
|
||||
if mem_bytes == 0 and info:
|
||||
first_key = next(iter(info))
|
||||
mem_bytes = info[first_key]["value"]["mem_size"]
|
||||
size_mb = mem_bytes / (1024 * 1024)
|
||||
print(f"Dataset size: {size_mb:.1f} MB")
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.error(f"Unable to evaluate dataset size: {exc}")
|
||||
return True
|
||||
|
||||
if size_mb <= self.config.max_dataset_size_mb:
|
||||
return True
|
||||
logger.warning(
|
||||
f"Attempt to load large dataset: {size_mb:.1f} MB "
|
||||
f"(limit {self.config.max_dataset_size_mb} MB)"
|
||||
)
|
||||
if self._skip_large_dataset_warning:
|
||||
logger.info("Skipping large dataset warning dialog.")
|
||||
return False
|
||||
return self._confirm_large_dataset(size_mb)
|
||||
|
||||
def _confirm_large_dataset(self, size_mb: float) -> bool:
|
||||
"""
|
||||
Confirm with the user whether to load a large dataset with dialog popup.
|
||||
Also allows the user to adjust the maximum dataset size limit and if user
|
||||
wants to see this popup again during session.
|
||||
|
||||
Args:
|
||||
size_mb(float): Size of the dataset in MB.
|
||||
|
||||
Returns:
|
||||
bool: True if the user confirmed to load the dataset, False otherwise.
|
||||
"""
|
||||
if self._skip_large_dataset_warning:
|
||||
return True
|
||||
|
||||
dialog = QDialog(self)
|
||||
dialog.setWindowTitle("Large dataset detected")
|
||||
main_dialog_layout = QVBoxLayout(dialog)
|
||||
|
||||
# Limit adjustment widgets
|
||||
limit_adjustment_layout = QHBoxLayout()
|
||||
limit_adjustment_layout.addWidget(QLabel("New limit (MB):"))
|
||||
spin = QDoubleSpinBox()
|
||||
spin.setRange(0.001, 4096)
|
||||
spin.setDecimals(3)
|
||||
spin.setSingleStep(0.01)
|
||||
spin.setValue(self.config.max_dataset_size_mb)
|
||||
spin.valueChanged.connect(lambda value: setattr(self.config, "max_dataset_size_mb", value))
|
||||
limit_adjustment_layout.addWidget(spin)
|
||||
|
||||
# Don't show again checkbox
|
||||
checkbox = QCheckBox("Don't show this again for this session")
|
||||
|
||||
buttons = QDialogButtonBox(
|
||||
QDialogButtonBox.Yes | QDialogButtonBox.No, Qt.Horizontal, dialog
|
||||
)
|
||||
buttons.accepted.connect(dialog.accept) # Yes
|
||||
buttons.rejected.connect(dialog.reject) # No
|
||||
|
||||
# widget layout
|
||||
main_dialog_layout.addWidget(
|
||||
QLabel(
|
||||
f"The selected dataset is {size_mb:.1f} MB which exceeds the "
|
||||
f"current limit of {self.config.max_dataset_size_mb} MB.\n"
|
||||
)
|
||||
)
|
||||
main_dialog_layout.addLayout(limit_adjustment_layout)
|
||||
main_dialog_layout.addWidget(checkbox)
|
||||
main_dialog_layout.addWidget(QLabel("Would you like to display dataset anyway?"))
|
||||
main_dialog_layout.addWidget(buttons)
|
||||
|
||||
result = dialog.exec() # modal; waits for user choice
|
||||
|
||||
# Respect the “don't show again” checkbox for *either* choice
|
||||
if checkbox.isChecked():
|
||||
self._skip_large_dataset_warning = True
|
||||
|
||||
if result == QDialog.Accepted:
|
||||
self.config.max_dataset_size_mb = spin.value()
|
||||
return True
|
||||
return False
|
||||
|
||||
def _ensure_str_list(self, entries: list | tuple | np.ndarray):
|
||||
"""
|
||||
Convert a variety of possible inputs (string, bytes, list/tuple/ndarray of either)
|
||||
@@ -1787,7 +1969,7 @@ class DemoApp(QMainWindow): # pragma: no cover
|
||||
self.setCentralWidget(self.main_widget)
|
||||
|
||||
self.waveform_popup = Waveform(popups=True)
|
||||
self.waveform_popup.plot(y_name="monitor_async")
|
||||
self.waveform_popup.plot(y_name="waveform")
|
||||
|
||||
self.waveform_side = Waveform(popups=False)
|
||||
self.waveform_side.plot(y_name="bpm4i", y_entry="bpm4i", dap="GaussianModel")
|
||||
|
||||
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "bec_widgets"
|
||||
version = "2.9.2"
|
||||
version = "2.10.2"
|
||||
description = "BEC Widgets"
|
||||
requires-python = ">=3.10"
|
||||
classifiers = [
|
||||
@@ -21,7 +21,6 @@ dependencies = [
|
||||
"pydantic~=2.0",
|
||||
"pyqtgraph~=0.13",
|
||||
"PySide6~=6.8.2",
|
||||
"pyte", # needed for vt100 console
|
||||
"qtconsole~=5.5, >=5.5.1", # needed for jupyter console
|
||||
"qtpy~=2.4",
|
||||
]
|
||||
|
||||
@@ -1,65 +0,0 @@
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from pygments.token import Token
|
||||
from qtpy.QtCore import QEventLoop
|
||||
|
||||
from bec_widgets.utils.colors import apply_theme
|
||||
from bec_widgets.widgets.editors.console.console import BECConsole
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def console_widget(qtbot):
|
||||
apply_theme("light")
|
||||
console = BECConsole()
|
||||
console.set_cmd(sys.executable) # will launch Python interpreter
|
||||
console.set_prompt_tokens((Token.Prompt, ">>>"))
|
||||
qtbot.addWidget(console)
|
||||
console.show()
|
||||
qtbot.waitExposed(console)
|
||||
yield console
|
||||
console.terminate()
|
||||
|
||||
|
||||
def test_console_widget(console_widget, qtbot, tmp_path):
|
||||
def wait_prompt(command_to_execute=None, busy=False):
|
||||
signal_waiter = QEventLoop()
|
||||
|
||||
def exit_loop(idle):
|
||||
if busy and not idle:
|
||||
signal_waiter.quit()
|
||||
elif not busy and idle:
|
||||
signal_waiter.quit()
|
||||
|
||||
console_widget.prompt.connect(exit_loop)
|
||||
if command_to_execute:
|
||||
if callable(command_to_execute):
|
||||
command_to_execute()
|
||||
else:
|
||||
console_widget.execute_command(command_to_execute)
|
||||
signal_waiter.exec_()
|
||||
|
||||
console_widget.start()
|
||||
wait_prompt()
|
||||
|
||||
# use console to write something to a tmp file
|
||||
tmp_filename = str(tmp_path / "console_test.txt")
|
||||
wait_prompt(f"f = open('{tmp_filename}', 'wt'); f.write('HELLO CONSOLE'); f.close()")
|
||||
# check the code has been executed by console, by checking the tmp file contents
|
||||
with open(tmp_filename, "rt") as f:
|
||||
assert f.read() == "HELLO CONSOLE"
|
||||
|
||||
# execute a sleep
|
||||
t0 = time.perf_counter()
|
||||
wait_prompt("import time; time.sleep(1)")
|
||||
assert time.perf_counter() - t0 >= 1
|
||||
|
||||
# test ctrl-c
|
||||
t0 = time.perf_counter()
|
||||
wait_prompt("time.sleep(5)", busy=True)
|
||||
wait_prompt(console_widget.send_ctrl_c)
|
||||
assert (
|
||||
time.perf_counter() - t0 < 1
|
||||
) # in reality it will be almost immediate, but ok we can say less than 1 second compared to 5
|
||||
@@ -1,3 +1,5 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from types import SimpleNamespace
|
||||
from unittest import mock
|
||||
@@ -7,6 +9,15 @@ import numpy as np
|
||||
import pyqtgraph as pg
|
||||
import pytest
|
||||
from pyqtgraph.graphicsItems.DateAxisItem import DateAxisItem
|
||||
from qtpy.QtCore import QTimer
|
||||
from qtpy.QtWidgets import (
|
||||
QApplication,
|
||||
QCheckBox,
|
||||
QDialog,
|
||||
QDialogButtonBox,
|
||||
QDoubleSpinBox,
|
||||
QSpinBox,
|
||||
)
|
||||
|
||||
from bec_widgets.widgets.plots.plot_base import UIMode
|
||||
from bec_widgets.widgets.plots.waveform.curve import DeviceSignal
|
||||
@@ -533,6 +544,7 @@ def test_on_async_readback_add_update(qtbot, mocked_client):
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
wf.scan_item = create_dummy_scan_item()
|
||||
wf._scan_done = False # simulate a live scan
|
||||
c = wf.plot(arg1="async_device", label="async_device-async_device")
|
||||
wf._async_curves = [c]
|
||||
# Suppose existing data
|
||||
@@ -819,3 +831,227 @@ def test_show_dap_summary_popup(qtbot, mocked_client):
|
||||
wf.dap_summary_dialog.close()
|
||||
assert wf.dap_summary_dialog is None
|
||||
assert fit_action.isChecked() is False
|
||||
|
||||
|
||||
#####################################################
|
||||
# The following tests are for the async dataset guard
|
||||
#####################################################
|
||||
|
||||
|
||||
def test_skip_large_dataset_warning_property(qtbot, mocked_client):
|
||||
"""
|
||||
Verify the getter and setter of skip_large_dataset_warning work correctly.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
|
||||
# Default should be False
|
||||
assert wf.skip_large_dataset_warning is False
|
||||
|
||||
# Set to True
|
||||
wf.skip_large_dataset_warning = True
|
||||
assert wf.skip_large_dataset_warning is True
|
||||
|
||||
# Toggle back to False
|
||||
wf.skip_large_dataset_warning = False
|
||||
assert wf.skip_large_dataset_warning is False
|
||||
|
||||
|
||||
def test_max_dataset_size_mb_property(qtbot, mocked_client):
|
||||
"""
|
||||
Verify getter, setter, and validation of max_dataset_size_mb.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
|
||||
# Default from WaveformConfig is 1 MB
|
||||
assert wf.max_dataset_size_mb == 10
|
||||
|
||||
# Set to a valid new value
|
||||
wf.max_dataset_size_mb = 5.5
|
||||
assert wf.max_dataset_size_mb == 5.5
|
||||
# Ensure the config is updated too
|
||||
assert wf.config.max_dataset_size_mb == 5.5
|
||||
|
||||
|
||||
def _dummy_dataset(mem_bytes: int, entry: str = "waveform_waveform"):
|
||||
"""
|
||||
Return an object that mimics the BEC dataset structure:
|
||||
it has exactly one attribute `_info` with the expected layout.
|
||||
"""
|
||||
return SimpleNamespace(_info={entry: {"value": {"mem_size": mem_bytes}}})
|
||||
|
||||
|
||||
def test_dataset_guard_under_limit(qtbot, mocked_client, monkeypatch):
|
||||
"""
|
||||
Dataset below the limit should load without triggering the dialog.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
wf.max_dataset_size_mb = 1 # 1 MiB
|
||||
|
||||
# If the dialog is called, we flip this flag – it must stay False.
|
||||
called = {"dlg": False}
|
||||
monkeypatch.setattr(
|
||||
Waveform, "_confirm_large_dataset", lambda self, size_mb: called.__setitem__("dlg", True)
|
||||
)
|
||||
|
||||
dataset = _dummy_dataset(mem_bytes=512_000) # ≈0.49 MiB
|
||||
assert wf._check_dataset_size_and_confirm(dataset, "waveform_waveform") is True
|
||||
assert called["dlg"] is False
|
||||
|
||||
|
||||
def test_dataset_guard_over_limit_accept(qtbot, mocked_client, monkeypatch):
|
||||
"""
|
||||
Dataset above the limit where user presses *Yes*.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
wf.max_dataset_size_mb = 1 # 1 MiB
|
||||
|
||||
# Pretend the user clicked “Yes”
|
||||
monkeypatch.setattr(Waveform, "_confirm_large_dataset", lambda *_: True)
|
||||
|
||||
dataset = _dummy_dataset(mem_bytes=2_000_000) # ≈1.9 MiB
|
||||
assert wf._check_dataset_size_and_confirm(dataset, "waveform_waveform") is True
|
||||
|
||||
|
||||
def test_dataset_guard_over_limit_reject(qtbot, mocked_client, monkeypatch):
|
||||
"""
|
||||
Dataset above the limit where user presses *No*.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
wf.max_dataset_size_mb = 1 # 1 MiB
|
||||
|
||||
# Pretend the user clicked “No”
|
||||
monkeypatch.setattr(Waveform, "_confirm_large_dataset", lambda *_: False)
|
||||
|
||||
dataset = _dummy_dataset(mem_bytes=2_000_000) # ≈1.9 MiB
|
||||
assert wf._check_dataset_size_and_confirm(dataset, "waveform_waveform") is False
|
||||
|
||||
|
||||
##################################################
|
||||
# Dialog propagation behaviour
|
||||
##################################################
|
||||
|
||||
|
||||
def test_dialog_accept_updates_limit(monkeypatch, qtbot, mocked_client):
|
||||
"""
|
||||
Simulate clicking 'Yes' in the dialog *after* changing the spinner value.
|
||||
Verify max_dataset_size_mb is updated and dataset loads.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
wf.max_dataset_size_mb = 1 # start small
|
||||
|
||||
def fake_confirm(self, size_mb):
|
||||
# Simulate user typing '5' in the spinbox then pressing Yes
|
||||
self.config.max_dataset_size_mb = 5
|
||||
return True # Yes pressed
|
||||
|
||||
monkeypatch.setattr(Waveform, "_confirm_large_dataset", fake_confirm)
|
||||
|
||||
big_dataset = _dummy_dataset(mem_bytes=4_800_000) # ≈4.6 MiB
|
||||
accepted = wf._check_dataset_size_and_confirm(big_dataset, "waveform_waveform")
|
||||
|
||||
# The load should be accepted and the limit must reflect the new value
|
||||
assert accepted is True
|
||||
assert wf.max_dataset_size_mb == 5
|
||||
assert wf.config.max_dataset_size_mb == 5
|
||||
|
||||
|
||||
def test_dialog_cancel_sets_skip(monkeypatch, qtbot, mocked_client):
|
||||
"""
|
||||
Simulate clicking 'No' but ticking 'Don't show again'.
|
||||
Verify skip_large_dataset_warning becomes True and dataset is skipped.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
assert wf.skip_large_dataset_warning is False
|
||||
|
||||
def fake_confirm(self, size_mb):
|
||||
# Mimic ticking the checkbox then pressing No
|
||||
self._skip_large_dataset_warning = True
|
||||
return False # No pressed
|
||||
|
||||
monkeypatch.setattr(Waveform, "_confirm_large_dataset", fake_confirm)
|
||||
|
||||
big_dataset = _dummy_dataset(mem_bytes=11_000_000)
|
||||
accepted = wf._check_dataset_size_and_confirm(big_dataset, "waveform_waveform")
|
||||
|
||||
# Dataset must not load, but future warnings are suppressed
|
||||
assert accepted is False
|
||||
assert wf.skip_large_dataset_warning is True
|
||||
|
||||
|
||||
##################################################
|
||||
# Live dialog interaction (no monkey‑patching)
|
||||
##################################################
|
||||
|
||||
|
||||
def _open_dialog_and_click(handler):
|
||||
"""
|
||||
Utility that schedules *handler* to run as soon as a modal
|
||||
dialog is shown. Returns a function suitable for QTimer.singleShot.
|
||||
"""
|
||||
|
||||
def _cb():
|
||||
# Locate the active modal dialog
|
||||
dlg = QApplication.activeModalWidget()
|
||||
assert isinstance(dlg, QDialog), "No active modal dialog found"
|
||||
handler(dlg)
|
||||
|
||||
return _cb
|
||||
|
||||
|
||||
def test_dialog_accept_real_interaction(qtbot, mocked_client):
|
||||
"""
|
||||
End‑to‑end: user changes the limit spinner to 5 MiB, ticks
|
||||
'don't show again', then presses YES.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
wf.max_dataset_size_mb = 1
|
||||
|
||||
# Prepare a large dataset (≈4.6 MiB)
|
||||
big_dataset = _dummy_dataset(mem_bytes=4_800_000)
|
||||
|
||||
def handler(dlg):
|
||||
spin: QDoubleSpinBox = dlg.findChild(QDoubleSpinBox)
|
||||
chk: QCheckBox = dlg.findChild(QCheckBox)
|
||||
btns: QDialogButtonBox = dlg.findChild(QDialogButtonBox)
|
||||
|
||||
# # Interact with widgets
|
||||
spin.setValue(5)
|
||||
chk.setChecked(True)
|
||||
|
||||
yes_btn = btns.button(QDialogButtonBox.Yes)
|
||||
yes_btn.click()
|
||||
|
||||
# Schedule the handler right before invoking the check
|
||||
QTimer.singleShot(0, _open_dialog_and_click(handler))
|
||||
|
||||
accepted = wf._check_dataset_size_and_confirm(big_dataset, "waveform_waveform")
|
||||
assert accepted is True
|
||||
assert wf.max_dataset_size_mb == 5
|
||||
assert wf.skip_large_dataset_warning is True
|
||||
|
||||
|
||||
def test_dialog_reject_real_interaction(qtbot, mocked_client):
|
||||
"""
|
||||
End‑to‑end: user leaves spinner unchanged, ticks 'don't show again',
|
||||
and presses NO.
|
||||
"""
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client)
|
||||
wf.max_dataset_size_mb = 1
|
||||
|
||||
big_dataset = _dummy_dataset(mem_bytes=4_800_000)
|
||||
|
||||
def handler(dlg):
|
||||
chk: QCheckBox = dlg.findChild(QCheckBox)
|
||||
btns: QDialogButtonBox = dlg.findChild(QDialogButtonBox)
|
||||
|
||||
chk.setChecked(True)
|
||||
no_btn = btns.button(QDialogButtonBox.No)
|
||||
no_btn.click()
|
||||
|
||||
QTimer.singleShot(0, _open_dialog_and_click(handler))
|
||||
|
||||
accepted = wf._check_dataset_size_and_confirm(big_dataset, "waveform_waveform")
|
||||
assert accepted is False
|
||||
assert wf.skip_large_dataset_warning is True
|
||||
# Limit remains unchanged
|
||||
assert wf.max_dataset_size_mb == 1
|
||||
|
||||
Reference in New Issue
Block a user