1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-16 05:30:54 +02:00

Compare commits

...

11 Commits

Author SHA1 Message Date
semantic-release
f68f072da3 2.10.2
Automatically generated by python-semantic-release
2025-06-03 11:57:23 +00:00
1df6c1925b fix: remove unnecessary PySide imports 2025-06-03 13:56:35 +02:00
6b939ac34d ci: check for disallowed imports from PySide 2025-06-03 13:56:35 +02:00
semantic-release
6bcf20af07 2.10.1
Automatically generated by python-semantic-release
2025-06-02 18:37:30 +00:00
a64cf0dd87 build: pyte removed from dependencies 2025-06-02 20:36:51 +02:00
cd4e90a79f fix(console): qt console widget deleted 2025-06-02 20:36:51 +02:00
semantic-release
49a96a18d6 2.10.0
Automatically generated by python-semantic-release
2025-06-02 13:51:20 +00:00
2b4454a291 ci: fix artifact version 2025-06-02 15:50:41 +02:00
d12bd9fe1a ci: add job logs to e2e test 2025-06-02 15:50:41 +02:00
d0c1ac0cf5 feat(waveform): large async dataset warning popup 2025-06-02 15:50:41 +02:00
f90150d1c7 fix(waveform): waveform only update async data when scan is currently running 2025-06-02 15:50:41 +02:00
15 changed files with 536 additions and 1026 deletions

View File

@@ -47,4 +47,12 @@ jobs:
source ./bin/install_bec_dev.sh -t
cd ../
pip install -e ./ophyd_devices -e .[dev,pyside6] -e ./bec_testing_plugin
pytest -v --files-path ./ --start-servers --random-order ./tests/end-2-end
pytest -v --files-path ./ --start-servers --random-order ./tests/end-2-end
- name: Upload logs if job fails
if: failure()
uses: actions/upload-artifact@v4
with:
name: pytest-logs
path: ./logs/*.log
retention-days: 7

View File

@@ -14,10 +14,15 @@ jobs:
- name: Run black and isort
run: |
pip install black isort
pip install -e .[dev]
pip install uv
uv pip install --system black isort
uv pip install --system -e .[dev]
black --check --diff --color .
isort --check --diff ./
- name: Check for disallowed imports from PySide
run: '! grep -re "from PySide6\." bec_widgets/ | grep -v -e "PySide6.QtDesigner" -e "PySide6.scripts"'
Pylint:
runs-on: ubuntu-latest

View File

@@ -1,6 +1,53 @@
# CHANGELOG
## v2.10.2 (2025-06-03)
### Bug Fixes
- Remove unnecessary PySide imports
([`1df6c19`](https://github.com/bec-project/bec_widgets/commit/1df6c1925b6ec88df8d7a1a5a79a5ddc6b1161b5))
### Continuous Integration
- Check for disallowed imports from PySide
([`6b939ac`](https://github.com/bec-project/bec_widgets/commit/6b939ac34d01cdbb0e8e32a0bd4e56cad032e75b))
## v2.10.1 (2025-06-02)
### Bug Fixes
- **console**: Qt console widget deleted
([`cd4e90a`](https://github.com/bec-project/bec_widgets/commit/cd4e90a79fcdbc96f4ec23db22375d05a48731db))
### Build System
- Pyte removed from dependencies
([`a64cf0d`](https://github.com/bec-project/bec_widgets/commit/a64cf0dd871c1419e02d3803c74cc45966baac19))
## v2.10.0 (2025-06-02)
### Bug Fixes
- **waveform**: Waveform only update async data when scan is currently running
([`f90150d`](https://github.com/bec-project/bec_widgets/commit/f90150d1c708331d4ee78f82ebf5ef23cd81fd17))
### Continuous Integration
- Add job logs to e2e test
([`d12bd9f`](https://github.com/bec-project/bec_widgets/commit/d12bd9fe1a010babc94dc86405d1b75a2b07534c))
- Fix artifact version
([`2b4454a`](https://github.com/bec-project/bec_widgets/commit/2b4454a291bc69399ddd08780c44e1339825fb36))
### Features
- **waveform**: Large async dataset warning popup
([`d0c1ac0`](https://github.com/bec-project/bec_widgets/commit/d0c1ac0cf5d421d14c9e050ccf5832cd30ca0764))
## v2.9.2 (2025-05-30)
### Bug Fixes

View File

@@ -3970,6 +3970,48 @@ class Waveform(RPCBase):
The color palette of the figure widget.
"""
@property
@rpc_call
def skip_large_dataset_warning(self) -> "bool":
"""
Whether to skip the large dataset warning when fetching async data.
"""
@skip_large_dataset_warning.setter
@rpc_call
def skip_large_dataset_warning(self) -> "bool":
"""
Whether to skip the large dataset warning when fetching async data.
"""
@property
@rpc_call
def skip_large_dataset_check(self) -> "bool":
"""
Whether to skip the large dataset warning when fetching async data.
"""
@skip_large_dataset_check.setter
@rpc_call
def skip_large_dataset_check(self) -> "bool":
"""
Whether to skip the large dataset warning when fetching async data.
"""
@property
@rpc_call
def max_dataset_size_mb(self) -> "float":
"""
The maximum dataset size (in MB) permitted when fetching async data from history before prompting the user.
"""
@max_dataset_size_mb.setter
@rpc_call
def max_dataset_size_mb(self) -> "float":
"""
The maximum dataset size (in MB) permitted when fetching async data from history before prompting the user.
"""
@rpc_call
def plot(
self,

View File

@@ -1,5 +1,5 @@
from bec_lib.logger import bec_logger
from PySide6.QtGui import QCloseEvent
from qtpy.QtGui import QCloseEvent
from qtpy.QtWidgets import QDialog, QDialogButtonBox, QHBoxLayout, QPushButton, QVBoxLayout, QWidget
from bec_widgets.utils.error_popups import SafeSlot

View File

@@ -9,7 +9,7 @@ from bec_widgets.utils.plugin_utils import get_custom_classes
logger = bec_logger.logger
if PYSIDE6:
from PySide6.QtUiTools import QUiLoader
from qtpy.QtUiTools import QUiLoader
class CustomUiLoader(QUiLoader):
def __init__(self, baseinstance, custom_widgets: dict | None = None):

View File

@@ -1,870 +0,0 @@
"""
BECConsole is a Qt widget that runs a Bash shell.
BECConsole VT100 emulation is powered by Pyte,
(https://github.com/selectel/pyte).
"""
import collections
import fcntl
import html
import os
import pty
import re
import signal
import sys
import time
import pyte
from pygments.token import Token
from pyte.screens import History
from qtpy import QtCore, QtGui, QtWidgets
from qtpy.QtCore import Property as pyqtProperty
from qtpy.QtCore import QSize, QSocketNotifier, Qt, QTimer
from qtpy.QtCore import Signal as pyqtSignal
from qtpy.QtGui import QClipboard, QColor, QPalette, QTextCursor
from qtpy.QtWidgets import QApplication, QHBoxLayout, QScrollBar, QSizePolicy
from bec_widgets.utils.error_popups import SafeSlot as Slot
ansi_colors = {
"black": "#000000",
"red": "#CD0000",
"green": "#00CD00",
"brown": "#996633", # Brown, replacing the yellow
"blue": "#0000EE",
"magenta": "#CD00CD",
"cyan": "#00CDCD",
"white": "#E5E5E5",
"brightblack": "#7F7F7F",
"brightred": "#FF0000",
"brightgreen": "#00FF00",
"brightyellow": "#FFFF00",
"brightblue": "#5C5CFF",
"brightmagenta": "#FF00FF",
"brightcyan": "#00FFFF",
"brightwhite": "#FFFFFF",
}
control_keys_mapping = {
QtCore.Qt.Key_A: b"\x01", # Ctrl-A
QtCore.Qt.Key_B: b"\x02", # Ctrl-B
QtCore.Qt.Key_C: b"\x03", # Ctrl-C
QtCore.Qt.Key_D: b"\x04", # Ctrl-D
QtCore.Qt.Key_E: b"\x05", # Ctrl-E
QtCore.Qt.Key_F: b"\x06", # Ctrl-F
QtCore.Qt.Key_G: b"\x07", # Ctrl-G (Bell)
QtCore.Qt.Key_H: b"\x08", # Ctrl-H (Backspace)
QtCore.Qt.Key_I: b"\x09", # Ctrl-I (Tab)
QtCore.Qt.Key_J: b"\x0a", # Ctrl-J (Line Feed)
QtCore.Qt.Key_K: b"\x0b", # Ctrl-K (Vertical Tab)
QtCore.Qt.Key_L: b"\x0c", # Ctrl-L (Form Feed)
QtCore.Qt.Key_M: b"\x0d", # Ctrl-M (Carriage Return)
QtCore.Qt.Key_N: b"\x0e", # Ctrl-N
QtCore.Qt.Key_O: b"\x0f", # Ctrl-O
QtCore.Qt.Key_P: b"\x10", # Ctrl-P
QtCore.Qt.Key_Q: b"\x11", # Ctrl-Q
QtCore.Qt.Key_R: b"\x12", # Ctrl-R
QtCore.Qt.Key_S: b"\x13", # Ctrl-S
QtCore.Qt.Key_T: b"\x14", # Ctrl-T
QtCore.Qt.Key_U: b"\x15", # Ctrl-U
QtCore.Qt.Key_V: b"\x16", # Ctrl-V
QtCore.Qt.Key_W: b"\x17", # Ctrl-W
QtCore.Qt.Key_X: b"\x18", # Ctrl-X
QtCore.Qt.Key_Y: b"\x19", # Ctrl-Y
QtCore.Qt.Key_Z: b"\x1a", # Ctrl-Z
QtCore.Qt.Key_Escape: b"\x1b", # Ctrl-Escape
QtCore.Qt.Key_Backslash: b"\x1c", # Ctrl-\
QtCore.Qt.Key_Underscore: b"\x1f", # Ctrl-_
}
normal_keys_mapping = {
QtCore.Qt.Key_Return: b"\n",
QtCore.Qt.Key_Space: b" ",
QtCore.Qt.Key_Enter: b"\n",
QtCore.Qt.Key_Tab: b"\t",
QtCore.Qt.Key_Backspace: b"\x08",
QtCore.Qt.Key_Home: b"\x47",
QtCore.Qt.Key_End: b"\x4f",
QtCore.Qt.Key_Left: b"\x02",
QtCore.Qt.Key_Up: b"\x10",
QtCore.Qt.Key_Right: b"\x06",
QtCore.Qt.Key_Down: b"\x0e",
QtCore.Qt.Key_PageUp: b"\x49",
QtCore.Qt.Key_PageDown: b"\x51",
QtCore.Qt.Key_F1: b"\x1b\x31",
QtCore.Qt.Key_F2: b"\x1b\x32",
QtCore.Qt.Key_F3: b"\x1b\x33",
QtCore.Qt.Key_F4: b"\x1b\x34",
QtCore.Qt.Key_F5: b"\x1b\x35",
QtCore.Qt.Key_F6: b"\x1b\x36",
QtCore.Qt.Key_F7: b"\x1b\x37",
QtCore.Qt.Key_F8: b"\x1b\x38",
QtCore.Qt.Key_F9: b"\x1b\x39",
QtCore.Qt.Key_F10: b"\x1b\x30",
QtCore.Qt.Key_F11: b"\x45",
QtCore.Qt.Key_F12: b"\x46",
}
def QtKeyToAscii(event):
"""
Convert the Qt key event to the corresponding ASCII sequence for
the terminal. This works fine for standard alphanumerical characters, but
most other characters require terminal specific control sequences.
The conversion below works for TERM="linux" terminals.
"""
if sys.platform == "darwin":
# special case for MacOS
# /!\ Qt maps ControlModifier to CMD
# CMD-C, CMD-V for copy/paste
# CTRL-C and other modifiers -> key mapping
if event.modifiers() == QtCore.Qt.MetaModifier:
if event.key() == Qt.Key_Backspace:
return control_keys_mapping.get(Qt.Key_W)
return control_keys_mapping.get(event.key())
elif event.modifiers() == QtCore.Qt.ControlModifier:
if event.key() == Qt.Key_C:
# copy
return "copy"
elif event.key() == Qt.Key_V:
# paste
return "paste"
return None
else:
return normal_keys_mapping.get(event.key(), event.text().encode("utf8"))
if event.modifiers() == QtCore.Qt.ControlModifier:
return control_keys_mapping.get(event.key())
else:
return normal_keys_mapping.get(event.key(), event.text().encode("utf8"))
class Screen(pyte.HistoryScreen):
def __init__(self, stdin_fd, cols, rows, historyLength):
super().__init__(cols, rows, historyLength, ratio=1 / rows)
self._fd = stdin_fd
def write_process_input(self, data):
"""Response to CPR request (for example),
this can be for other requests
"""
try:
os.write(self._fd, data.encode("utf-8"))
except (IOError, OSError):
pass
def resize(self, lines, columns):
lines = lines or self.lines
columns = columns or self.columns
if lines == self.lines and columns == self.columns:
return # No changes.
self.dirty.clear()
self.dirty.update(range(lines))
self.save_cursor()
if lines < self.lines:
if lines <= self.cursor.y:
nlines_to_move_up = self.lines - lines
for i in range(nlines_to_move_up):
line = self.buffer[i] # .pop(0)
self.history.top.append(line)
self.cursor_position(0, 0)
self.delete_lines(nlines_to_move_up)
self.restore_cursor()
self.cursor.y -= nlines_to_move_up
else:
self.restore_cursor()
self.lines, self.columns = lines, columns
self.history = History(
self.history.top,
self.history.bottom,
1 / self.lines,
self.history.size,
self.history.position,
)
self.set_margins()
class Backend(QtCore.QObject):
"""
Poll Bash.
This class will run as a qsocketnotifier (started in ``_TerminalWidget``) and poll the
file descriptor of the Bash terminal.
"""
# Signals to communicate with ``_TerminalWidget``.
dataReady = pyqtSignal(object)
processExited = pyqtSignal()
def __init__(self, fd, cols, rows):
super().__init__()
# File descriptor that connects to Bash process.
self.fd = fd
# Setup Pyte (hard coded display size for now).
self.screen = Screen(self.fd, cols, rows, 10000)
self.stream = pyte.ByteStream()
self.stream.attach(self.screen)
self.notifier = QSocketNotifier(fd, QSocketNotifier.Read)
self.notifier.activated.connect(self._fd_readable)
def _fd_readable(self):
"""
Poll the Bash output, run it through Pyte, and notify
"""
# Read the shell output until the file descriptor is closed.
try:
out = os.read(self.fd, 2**16)
except OSError:
self.processExited.emit()
self.notifier.setEnabled(False)
return
# Feed output into Pyte's state machine and send the new screen
# output to the GUI
self.stream.feed(out)
self.dataReady.emit(self.screen)
class BECConsole(QtWidgets.QWidget):
"""Container widget for the terminal text area"""
PLUGIN = True
ICON_NAME = "terminal"
prompt = pyqtSignal(bool)
def __init__(self, parent=None, cols=132):
super().__init__(parent)
self.term = _TerminalWidget(self, cols, rows=43)
self.term.prompt.connect(self.prompt) # forward signal from term to this widget
self.scroll_bar = QScrollBar(Qt.Vertical, self)
# self.scroll_bar.hide()
layout = QHBoxLayout(self)
layout.addWidget(self.term)
layout.addWidget(self.scroll_bar)
layout.setAlignment(Qt.AlignLeft | Qt.AlignTop)
layout.setContentsMargins(0, 0, 0, 0)
self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Expanding)
pal = QPalette()
self.set_bgcolor(pal.window().color())
self.set_fgcolor(pal.windowText().color())
self.term.set_scroll_bar(self.scroll_bar)
self.set_cmd("bec --nogui")
self._check_designer_timer = QTimer()
self._check_designer_timer.timeout.connect(self.check_designer)
self._check_designer_timer.start(1000)
def minimumSizeHint(self):
size = self.term.sizeHint()
size.setWidth(size.width() + self.scroll_bar.width())
return size
def sizeHint(self):
return self.minimumSizeHint()
def check_designer(self, calls={"n": 0}):
calls["n"] += 1
if self.term.fd is not None:
# already started
self._check_designer_timer.stop()
elif self.window().windowTitle().endswith("[Preview]"):
# assuming Designer preview -> start
self._check_designer_timer.stop()
self.term.start()
elif calls["n"] >= 3:
# assuming not in Designer -> stop checking
self._check_designer_timer.stop()
def get_rows(self):
return self.term.rows
def set_rows(self, rows):
self.term.rows = rows
self.adjustSize()
self.updateGeometry()
def get_cols(self):
return self.term.cols
def set_cols(self, cols):
self.term.cols = cols
self.adjustSize()
self.updateGeometry()
def get_bgcolor(self):
return QColor.fromString(self.term.bg_color)
def set_bgcolor(self, color):
self.term.bg_color = color.name(QColor.HexRgb)
def get_fgcolor(self):
return QColor.fromString(self.term.fg_color)
def set_fgcolor(self, color):
self.term.fg_color = color.name(QColor.HexRgb)
def get_cmd(self):
return self.term._cmd
def set_cmd(self, cmd):
self.term._cmd = cmd
if self.term.fd is None:
# not started yet
self.term.clear()
self.term.appendHtml(f"<h2>BEC Console - {repr(cmd)}</h2>")
def start(self, deactivate_ctrl_d=True):
self.term.start(deactivate_ctrl_d=deactivate_ctrl_d)
def push(self, text, hit_return=False):
"""Push some text to the terminal"""
return self.term.push(text, hit_return=hit_return)
def execute_command(self, command):
self.push(command, hit_return=True)
def set_prompt_tokens(self, *tokens):
"""Prepare regexp to identify prompt, based on tokens
Tokens are returned from get_ipython().prompts.in_prompt_tokens()
"""
regex_parts = []
for token_type, token_value in tokens:
if token_type == Token.PromptNum: # Handle dynamic prompt number
regex_parts.append(r"[\d\?]+") # Match one or more digits or '?'
else:
# Escape other prompt parts (e.g., "In [", "]: ")
if not token_value:
regex_parts.append(".+?") # arbitrary string
else:
regex_parts.append(re.escape(token_value))
# Combine into a single regex
prompt_pattern = "".join(regex_parts)
self.term._prompt_re = re.compile(prompt_pattern + r"\s*$")
def terminate(self, timeout=10):
self.term.stop(timeout=timeout)
def send_ctrl_c(self, timeout=None):
self.term.send_ctrl_c(timeout)
cols = pyqtProperty(int, get_cols, set_cols)
rows = pyqtProperty(int, get_rows, set_rows)
bgcolor = pyqtProperty(QColor, get_bgcolor, set_bgcolor)
fgcolor = pyqtProperty(QColor, get_fgcolor, set_fgcolor)
cmd = pyqtProperty(str, get_cmd, set_cmd)
class _TerminalWidget(QtWidgets.QPlainTextEdit):
"""
Start ``Backend`` process and render Pyte output as text.
"""
prompt = pyqtSignal(bool)
def __init__(self, parent, cols=125, rows=50, **kwargs):
# regexp to match prompt
self._prompt_re = None
# last prompt
self._prompt_str = None
# process pid
self.pid = None
# file descriptor to communicate with the subprocess
self.fd = None
self.backend = None
# command to execute
self._cmd = ""
# should ctrl-d be deactivated ? (prevent Python exit)
self._deactivate_ctrl_d = False
# Default colors
pal = QPalette()
self._fg_color = pal.text().color().name()
self._bg_color = pal.base().color().name()
# Specify the terminal size in terms of lines and columns.
self._rows = rows
self._cols = cols
self.output = collections.deque()
super().__init__(parent)
self.setSizePolicy(QSizePolicy.MinimumExpanding, QSizePolicy.Expanding)
# Disable default scrollbars (we use our own, to be set via .set_scroll_bar())
self.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
self.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
self.scroll_bar = None
# Use Monospace fonts and disable line wrapping.
self.setFont(QtGui.QFont("Courier", 9))
self.setFont(QtGui.QFont("Monospace"))
self.setLineWrapMode(QtWidgets.QPlainTextEdit.NoWrap)
fmt = QtGui.QFontMetrics(self.font())
char_width = fmt.width("w")
self.setCursorWidth(char_width)
self.adjustSize()
self.updateGeometry()
self.update_stylesheet()
@property
def bg_color(self):
return self._bg_color
@bg_color.setter
def bg_color(self, hexcolor):
self._bg_color = hexcolor
self.update_stylesheet()
@property
def fg_color(self):
return self._fg_color
@fg_color.setter
def fg_color(self, hexcolor):
self._fg_color = hexcolor
self.update_stylesheet()
def update_stylesheet(self):
self.setStyleSheet(
f"QPlainTextEdit {{ border: 0; color: {self._fg_color}; background-color: {self._bg_color}; }} "
)
@property
def rows(self):
return self._rows
@rows.setter
def rows(self, rows: int):
if self.backend is None:
# not initialized yet, ok to change
self._rows = rows
self.adjustSize()
self.updateGeometry()
else:
raise RuntimeError("Cannot change rows after console is started.")
@property
def cols(self):
return self._cols
@cols.setter
def cols(self, cols: int):
if self.fd is None:
# not initialized yet, ok to change
self._cols = cols
self.adjustSize()
self.updateGeometry()
else:
raise RuntimeError("Cannot change cols after console is started.")
def start(self, deactivate_ctrl_d: bool = False):
self._deactivate_ctrl_d = deactivate_ctrl_d
self.update_term_size()
# Start the Bash process
self.pid, self.fd = self.fork_shell()
if self.fd:
# Create the ``Backend`` object
self.backend = Backend(self.fd, self.cols, self.rows)
self.backend.dataReady.connect(self.data_ready)
self.backend.processExited.connect(self.process_exited)
else:
self.process_exited()
def process_exited(self):
self.fd = None
self.clear()
self.appendHtml(f"<br><h2>{repr(self._cmd)} - Process exited.</h2>")
self.setReadOnly(True)
def send_ctrl_c(self, wait_prompt=True, timeout=None):
"""Send CTRL-C to the process
If wait_prompt=True (default), wait for a new prompt after CTRL-C
If no prompt is displayed after 'timeout' seconds, TimeoutError is raised
"""
os.kill(self.pid, signal.SIGINT)
if wait_prompt:
timeout_error = False
if timeout:
def set_timeout_error():
nonlocal timeout_error
timeout_error = True
timeout_timer = QTimer()
timeout_timer.singleShot(timeout * 1000, set_timeout_error)
while self._prompt_str is None:
QApplication.instance().process_events()
if timeout_error:
raise TimeoutError(
f"CTRL-C: could not get back to prompt after {timeout} seconds."
)
def _is_running(self):
if os.waitpid(self.pid, os.WNOHANG) == (0, 0):
return True
return False
def stop(self, kill=True, timeout=None):
"""Stop the running process
SIGTERM is the default signal for terminating processes.
If kill=True (default), SIGKILL will be sent if the process does not exit after timeout
"""
# try to exit gracefully
os.kill(self.pid, signal.SIGTERM)
# wait until process is truly dead
t0 = time.perf_counter()
while self._is_running():
time.sleep(1)
if timeout is not None and time.perf_counter() - t0 > timeout:
# still alive after 'timeout' seconds
if kill:
# send SIGKILL and make a last check in loop
os.kill(self.pid, signal.SIGKILL)
kill = False
else:
# still running after timeout...
raise TimeoutError(
f"Could not terminate process with pid: {self.pid} within timeout"
)
self.process_exited()
def data_ready(self, screen):
"""Handle new screen: redraw, set scroll bar max and slider, move cursor to its position
This method is triggered via a signal from ``Backend``.
"""
self.redraw_screen()
self.adjust_scroll_bar()
self.move_cursor()
def minimumSizeHint(self):
"""Return minimum size for current cols and rows"""
fmt = QtGui.QFontMetrics(self.font())
char_width = fmt.width("w")
char_height = fmt.height()
width = char_width * self.cols
height = char_height * self.rows
return QSize(width, height)
def sizeHint(self):
return self.minimumSizeHint()
def set_scroll_bar(self, scroll_bar):
self.scroll_bar = scroll_bar
self.scroll_bar.setMinimum(0)
self.scroll_bar.valueChanged.connect(self.scroll_value_change)
def scroll_value_change(self, value, old={"value": -1}):
if self.backend is None:
return
if old["value"] == -1:
old["value"] = self.scroll_bar.maximum()
if value <= old["value"]:
# scroll up
# value is number of lines from the start
nlines = old["value"] - value
# history ratio gives prev_page == 1 line
for i in range(nlines):
self.backend.screen.prev_page()
else:
# scroll down
nlines = value - old["value"]
for i in range(nlines):
self.backend.screen.next_page()
old["value"] = value
self.redraw_screen()
def adjust_scroll_bar(self):
sb = self.scroll_bar
sb.valueChanged.disconnect(self.scroll_value_change)
tmp = len(self.backend.screen.history.top) + len(self.backend.screen.history.bottom)
sb.setMaximum(tmp if tmp > 0 else 0)
sb.setSliderPosition(tmp if tmp > 0 else 0)
# if tmp > 0:
# # show scrollbar, but delayed - prevent recursion with widget size change
# QTimer.singleShot(0, scrollbar.show)
# else:
# QTimer.singleShot(0, scrollbar.hide)
sb.valueChanged.connect(self.scroll_value_change)
def write(self, data):
try:
os.write(self.fd, data)
except (IOError, OSError):
self.process_exited()
@Slot(object)
def keyPressEvent(self, event):
"""
Redirect all keystrokes to the terminal process.
"""
if self.fd is None:
# not started
return
# Convert the Qt key to the correct ASCII code.
if (
self._deactivate_ctrl_d
and event.modifiers() == QtCore.Qt.ControlModifier
and event.key() == QtCore.Qt.Key_D
):
return None
code = QtKeyToAscii(event)
if code == "copy":
# MacOS only: CMD-C handling
self.copy()
elif code == "paste":
# MacOS only: CMD-V handling
self._push_clipboard()
elif code is not None:
self.write(code)
def push(self, text, hit_return=False):
"""
Write 'text' to terminal
"""
self.write(text.encode("utf-8"))
if hit_return:
self.write(b"\n")
def contextMenuEvent(self, event):
if self.fd is None:
return
menu = self.createStandardContextMenu()
for action in menu.actions():
# remove all actions except copy and paste
if "opy" in action.text():
# redefine text without shortcut
# since it probably clashes with control codes (like CTRL-C etc)
action.setText("Copy")
continue
if "aste" in action.text():
# redefine text without shortcut
action.setText("Paste")
# paste -> have to insert with self.push
action.triggered.connect(self._push_clipboard)
continue
menu.removeAction(action)
menu.exec_(event.globalPos())
def _push_clipboard(self):
clipboard = QApplication.instance().clipboard()
self.push(clipboard.text())
def move_cursor(self):
textCursor = self.textCursor()
textCursor.setPosition(0)
textCursor.movePosition(
QTextCursor.Down, QTextCursor.MoveAnchor, self.backend.screen.cursor.y
)
textCursor.movePosition(
QTextCursor.Right, QTextCursor.MoveAnchor, self.backend.screen.cursor.x
)
self.setTextCursor(textCursor)
def mouseReleaseEvent(self, event):
if self.fd is None:
return
if event.button() == Qt.MiddleButton:
# push primary selection buffer ("mouse clipboard") to terminal
clipboard = QApplication.instance().clipboard()
if clipboard.supportsSelection():
self.push(clipboard.text(QClipboard.Selection))
return None
elif event.button() == Qt.LeftButton:
# left button click
textCursor = self.textCursor()
if textCursor.selectedText():
# mouse was used to select text -> nothing to do
pass
else:
# a simple 'click', move scrollbar to end
self.scroll_bar.setSliderPosition(self.scroll_bar.maximum())
self.move_cursor()
return None
return super().mouseReleaseEvent(event)
def redraw_screen(self):
"""
Render the screen as formatted text into the widget.
"""
screen = self.backend.screen
# Clear the widget
if screen.dirty:
self.clear()
while len(self.output) < (max(screen.dirty) + 1):
self.output.append("")
while len(self.output) > (max(screen.dirty) + 1):
self.output.pop()
# Prepare the HTML output
for line_no in screen.dirty:
line = text = ""
style = old_style = ""
old_idx = 0
for idx, ch in screen.buffer[line_no].items():
text += " " * (idx - old_idx - 1)
old_idx = idx
style = f"{'background-color:%s;' % ansi_colors.get(ch.bg, ansi_colors['black']) if ch.bg!='default' else ''}{'color:%s;' % ansi_colors.get(ch.fg, ansi_colors['white']) if ch.fg!='default' else ''}{'font-weight:bold;' if ch.bold else ''}{'font-style:italic;' if ch.italics else ''}"
if style != old_style:
if old_style:
line += f"<span style={repr(old_style)}>{html.escape(text, quote=True)}</span>"
else:
line += html.escape(text, quote=True)
text = ""
old_style = style
text += ch.data
if style:
line += f"<span style={repr(style)}>{html.escape(text, quote=True)}</span>"
else:
line += html.escape(text, quote=True)
# do a check at the cursor position:
# it is possible x pos > output line length,
# for example if last escape codes are "cursor forward" past end of text,
# like IPython does for "..." prompt (in a block, like "for" loop or "while" for example)
# In this case, cursor is at 12 but last text output is at 8 -> insert spaces
if line_no == screen.cursor.y:
llen = len(screen.buffer[line_no])
if llen < screen.cursor.x:
line += " " * (screen.cursor.x - llen)
self.output[line_no] = line
# fill the text area with HTML contents in one go
self.appendHtml(f"<pre>{chr(10).join(self.output)}</pre>")
if self._prompt_re is not None:
text_buf = self.toPlainText()
prompt = self._prompt_re.search(text_buf)
if prompt is None:
if self._prompt_str:
self.prompt.emit(False)
self._prompt_str = None
else:
prompt_str = prompt.string.rstrip()
if prompt_str != self._prompt_str:
self._prompt_str = prompt_str
self.prompt.emit(True)
# did updates, all clean
screen.dirty.clear()
def update_term_size(self):
fmt = QtGui.QFontMetrics(self.font())
char_width = fmt.width("w")
char_height = fmt.height()
self._cols = int(self.width() / char_width)
self._rows = int(self.height() / char_height)
def resizeEvent(self, event):
self.update_term_size()
if self.fd:
self.backend.screen.resize(self._rows, self._cols)
self.redraw_screen()
self.adjust_scroll_bar()
self.move_cursor()
def wheelEvent(self, event):
if not self.fd:
return
y = event.angleDelta().y()
if y > 0:
self.backend.screen.prev_page()
else:
self.backend.screen.next_page()
self.redraw_screen()
def fork_shell(self):
"""
Fork the current process and execute bec in shell.
"""
try:
pid, fd = pty.fork()
except (IOError, OSError):
return False
if pid == 0:
try:
ls = os.environ["LANG"].split(".")
except KeyError:
ls = []
if len(ls) < 2:
ls = ["en_US", "UTF-8"]
os.putenv("COLUMNS", str(self.cols))
os.putenv("LINES", str(self.rows))
os.putenv("TERM", "linux")
os.putenv("LANG", ls[0] + ".UTF-8")
if not self._cmd:
self._cmd = os.environ["SHELL"]
cmd = self._cmd
if isinstance(cmd, str):
cmd = cmd.split()
try:
os.execvp(cmd[0], cmd)
except (IOError, OSError):
pass
os._exit(0)
else:
# We are in the parent process.
# Set file control
fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)
return pid, fd
if __name__ == "__main__":
import os
import sys
from qtpy import QtGui, QtWidgets
# Create the Qt application and console.
app = QtWidgets.QApplication([])
mainwin = QtWidgets.QMainWindow()
title = "BECConsole"
mainwin.setWindowTitle(title)
console = BECConsole(mainwin)
mainwin.setCentralWidget(console)
def check_prompt(at_prompt):
if at_prompt:
print("NEW PROMPT")
else:
print("EXECUTING SOMETHING...")
console.set_prompt_tokens(
(Token.OutPromptNum, ""),
(Token.Prompt, ""), # will match arbitrary string,
(Token.Prompt, " ["),
(Token.PromptNum, "3"),
(Token.Prompt, "/"),
(Token.PromptNum, "1"),
(Token.Prompt, "] "),
(Token.Prompt, ""),
)
console.prompt.connect(check_prompt)
console.start()
# Show widget and launch Qt's event loop.
mainwin.show()
sys.exit(app.exec_())

View File

@@ -1 +0,0 @@
{'files': ['console.py']}

View File

@@ -1,58 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
import os
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
import bec_widgets
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.editors.console.console import BECConsole
DOM_XML = """
<ui language='c++'>
<widget class='BECConsole' name='bec_console'>
</widget>
</ui>
"""
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
class BECConsolePlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = BECConsole(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return "BEC Console"
def icon(self):
return designer_material_icon(BECConsole.ICON_NAME)
def includeFile(self):
return "bec_console"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "BECConsole"
def toolTip(self):
return "A terminal-like vt100 widget."
def whatsThis(self):
return self.toolTip()

View File

@@ -1,15 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.editors.console.console_plugin import BECConsolePlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(BECConsolePlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -9,8 +9,19 @@ import pyqtgraph as pg
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from pydantic import Field, ValidationError, field_validator
from qtpy.QtCore import QTimer, Signal
from qtpy.QtWidgets import QApplication, QDialog, QHBoxLayout, QMainWindow, QVBoxLayout, QWidget
from qtpy.QtCore import Qt, QTimer, Signal
from qtpy.QtWidgets import (
QApplication,
QCheckBox,
QDialog,
QDialogButtonBox,
QDoubleSpinBox,
QHBoxLayout,
QLabel,
QMainWindow,
QVBoxLayout,
QWidget,
)
from bec_widgets.utils import ConnectionConfig
from bec_widgets.utils.bec_signal_proxy import BECSignalProxy
@@ -33,6 +44,11 @@ class WaveformConfig(ConnectionConfig):
color_palette: str | None = Field(
"plasma", description="The color palette of the figure widget.", validate_default=True
)
max_dataset_size_mb: float = Field(
10,
description="Maximum dataset size (in MB) permitted when fetching async data from history before prompting the user.",
validate_default=True,
)
model_config: dict = {"validate_assignment": True}
_validate_color_palette = field_validator("color_palette")(Colors.validate_color_map)
@@ -96,6 +112,12 @@ class Waveform(PlotBase):
"x_entry.setter",
"color_palette",
"color_palette.setter",
"skip_large_dataset_warning",
"skip_large_dataset_warning.setter",
"skip_large_dataset_check",
"skip_large_dataset_check.setter",
"max_dataset_size_mb",
"max_dataset_size_mb.setter",
"plot",
"add_dap_curve",
"remove_curve",
@@ -144,6 +166,7 @@ class Waveform(PlotBase):
self._mode: Literal["none", "sync", "async", "mixed"] = "none"
# Scan data
self._scan_done = True # means scan is not running
self.old_scan_id = None
self.scan_id = None
self.scan_item = None
@@ -163,6 +186,10 @@ class Waveform(PlotBase):
self._init_curve_dialog()
self.curve_settings_dialog = None
# Largedataset guard
self._skip_large_dataset_warning = False # session flag
self._skip_large_dataset_check = False # per-plot flag, to skip the warning for this plot
# Scan status update loop
self.bec_dispatcher.connect_slot(self.on_scan_status, MessageEndpoints.scan_status())
self.bec_dispatcher.connect_slot(self.on_scan_progress, MessageEndpoints.scan_progress())
@@ -561,6 +588,59 @@ class Waveform(PlotBase):
"""
return [item for item in self.plot_item.curves if isinstance(item, Curve)]
@SafeProperty(bool)
def skip_large_dataset_check(self) -> bool:
"""
Whether to skip the large dataset warning when fetching async data.
"""
return self._skip_large_dataset_check
@skip_large_dataset_check.setter
def skip_large_dataset_check(self, value: bool):
"""
Set whether to skip the large dataset warning when fetching async data.
Args:
value(bool): Whether to skip the large dataset warning.
"""
self._skip_large_dataset_check = value
@SafeProperty(bool)
def skip_large_dataset_warning(self) -> bool:
"""
Whether to skip the large dataset warning when fetching async data.
"""
return self._skip_large_dataset_warning
@skip_large_dataset_warning.setter
def skip_large_dataset_warning(self, value: bool):
"""
Set whether to skip the large dataset warning when fetching async data.
Args:
value(bool): Whether to skip the large dataset warning.
"""
self._skip_large_dataset_warning = value
@SafeProperty(float)
def max_dataset_size_mb(self) -> float:
"""
The maximum dataset size (in MB) permitted when fetching async data from history before prompting the user.
"""
return self.config.max_dataset_size_mb
@max_dataset_size_mb.setter
def max_dataset_size_mb(self, value: float):
"""
Set the maximum dataset size (in MB) permitted when fetching async data from history before prompting the user.
Args:
value(float): The maximum dataset size in MB.
"""
if value <= 0:
raise ValueError("Maximum dataset size must be greater than 0.")
self.config.max_dataset_size_mb = value
################################################################################
# High Level methods for API
################################################################################
@@ -807,8 +887,6 @@ class Waveform(PlotBase):
if config.source == "device":
if self.scan_item is None:
self.update_with_scan_history(-1)
if curve in self._async_curves:
self._setup_async_curve(curve)
self.async_signal_update.emit()
self.sync_signal_update.emit()
if config.source == "dap":
@@ -1056,8 +1134,8 @@ class Waveform(PlotBase):
meta(dict): The message metadata.
"""
self.sync_signal_update.emit()
status = msg.get("done")
if status:
self._scan_done = msg.get("done")
if self._scan_done:
QTimer.singleShot(100, self.update_sync_curves)
QTimer.singleShot(300, self.update_sync_curves)
@@ -1135,9 +1213,11 @@ class Waveform(PlotBase):
if access_key == "val": # live access
device_data = data.get(device_name, {}).get(device_entry, {}).get(access_key, None)
else: # history access
device_data = (
data.get(device_name, {}).get(device_entry, {}).read().get("value", None)
)
dataset_obj = data.get(device_name, {})
if self._skip_large_dataset_check is False:
if not self._check_dataset_size_and_confirm(dataset_obj, device_entry):
continue # user declined to load; skip this curve
device_data = dataset_obj.get(device_entry, {}).read().get("value", None)
# if shape is 2D cast it into 1D and take the last waveform
if len(np.shape(device_data)) > 1:
@@ -1581,6 +1661,8 @@ class Waveform(PlotBase):
dev_name = curve.config.signal.name
if dev_name in readout_priority_async:
self._async_curves.append(curve)
if hasattr(self.scan_item, "live_data"):
self._setup_async_curve(curve)
found_async = True
elif dev_name in readout_priority_sync:
self._sync_curves.append(curve)
@@ -1657,6 +1739,106 @@ class Waveform(PlotBase):
################################################################################
# Utility Methods
################################################################################
# Large dataset handling helpers
def _check_dataset_size_and_confirm(self, dataset_obj, device_entry: str) -> bool:
"""
Check the size of the dataset and confirm with the user if it exceeds the limit.
Args:
dataset_obj: The dataset object containing the information.
device_entry( str): The specific device entry to check.
Returns:
bool: True if the dataset is within the size limit or user confirmed to load it,
False if the dataset exceeds the size limit and user declined to load it.
"""
try:
info = dataset_obj._info
mem_bytes = info.get(device_entry, {}).get("value", {}).get("mem_size", 0)
# Fallback grab first entry if lookup failed
if mem_bytes == 0 and info:
first_key = next(iter(info))
mem_bytes = info[first_key]["value"]["mem_size"]
size_mb = mem_bytes / (1024 * 1024)
print(f"Dataset size: {size_mb:.1f} MB")
except Exception as exc: # noqa: BLE001
logger.error(f"Unable to evaluate dataset size: {exc}")
return True
if size_mb <= self.config.max_dataset_size_mb:
return True
logger.warning(
f"Attempt to load large dataset: {size_mb:.1f} MB "
f"(limit {self.config.max_dataset_size_mb} MB)"
)
if self._skip_large_dataset_warning:
logger.info("Skipping large dataset warning dialog.")
return False
return self._confirm_large_dataset(size_mb)
def _confirm_large_dataset(self, size_mb: float) -> bool:
"""
Confirm with the user whether to load a large dataset with dialog popup.
Also allows the user to adjust the maximum dataset size limit and if user
wants to see this popup again during session.
Args:
size_mb(float): Size of the dataset in MB.
Returns:
bool: True if the user confirmed to load the dataset, False otherwise.
"""
if self._skip_large_dataset_warning:
return True
dialog = QDialog(self)
dialog.setWindowTitle("Large dataset detected")
main_dialog_layout = QVBoxLayout(dialog)
# Limit adjustment widgets
limit_adjustment_layout = QHBoxLayout()
limit_adjustment_layout.addWidget(QLabel("New limit (MB):"))
spin = QDoubleSpinBox()
spin.setRange(0.001, 4096)
spin.setDecimals(3)
spin.setSingleStep(0.01)
spin.setValue(self.config.max_dataset_size_mb)
spin.valueChanged.connect(lambda value: setattr(self.config, "max_dataset_size_mb", value))
limit_adjustment_layout.addWidget(spin)
# Don't show again checkbox
checkbox = QCheckBox("Don't show this again for this session")
buttons = QDialogButtonBox(
QDialogButtonBox.Yes | QDialogButtonBox.No, Qt.Horizontal, dialog
)
buttons.accepted.connect(dialog.accept) # Yes
buttons.rejected.connect(dialog.reject) # No
# widget layout
main_dialog_layout.addWidget(
QLabel(
f"The selected dataset is {size_mb:.1f} MB which exceeds the "
f"current limit of {self.config.max_dataset_size_mb} MB.\n"
)
)
main_dialog_layout.addLayout(limit_adjustment_layout)
main_dialog_layout.addWidget(checkbox)
main_dialog_layout.addWidget(QLabel("Would you like to display dataset anyway?"))
main_dialog_layout.addWidget(buttons)
result = dialog.exec() # modal; waits for user choice
# Respect the “don't show again” checkbox for *either* choice
if checkbox.isChecked():
self._skip_large_dataset_warning = True
if result == QDialog.Accepted:
self.config.max_dataset_size_mb = spin.value()
return True
return False
def _ensure_str_list(self, entries: list | tuple | np.ndarray):
"""
Convert a variety of possible inputs (string, bytes, list/tuple/ndarray of either)
@@ -1787,7 +1969,7 @@ class DemoApp(QMainWindow): # pragma: no cover
self.setCentralWidget(self.main_widget)
self.waveform_popup = Waveform(popups=True)
self.waveform_popup.plot(y_name="monitor_async")
self.waveform_popup.plot(y_name="waveform")
self.waveform_side = Waveform(popups=False)
self.waveform_side.plot(y_name="bpm4i", y_entry="bpm4i", dap="GaussianModel")

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "bec_widgets"
version = "2.9.2"
version = "2.10.2"
description = "BEC Widgets"
requires-python = ">=3.10"
classifiers = [
@@ -21,7 +21,6 @@ dependencies = [
"pydantic~=2.0",
"pyqtgraph~=0.13",
"PySide6~=6.8.2",
"pyte", # needed for vt100 console
"qtconsole~=5.5, >=5.5.1", # needed for jupyter console
"qtpy~=2.4",
]

View File

@@ -1,65 +0,0 @@
import sys
import threading
import time
import pytest
from pygments.token import Token
from qtpy.QtCore import QEventLoop
from bec_widgets.utils.colors import apply_theme
from bec_widgets.widgets.editors.console.console import BECConsole
@pytest.fixture
def console_widget(qtbot):
apply_theme("light")
console = BECConsole()
console.set_cmd(sys.executable) # will launch Python interpreter
console.set_prompt_tokens((Token.Prompt, ">>>"))
qtbot.addWidget(console)
console.show()
qtbot.waitExposed(console)
yield console
console.terminate()
def test_console_widget(console_widget, qtbot, tmp_path):
def wait_prompt(command_to_execute=None, busy=False):
signal_waiter = QEventLoop()
def exit_loop(idle):
if busy and not idle:
signal_waiter.quit()
elif not busy and idle:
signal_waiter.quit()
console_widget.prompt.connect(exit_loop)
if command_to_execute:
if callable(command_to_execute):
command_to_execute()
else:
console_widget.execute_command(command_to_execute)
signal_waiter.exec_()
console_widget.start()
wait_prompt()
# use console to write something to a tmp file
tmp_filename = str(tmp_path / "console_test.txt")
wait_prompt(f"f = open('{tmp_filename}', 'wt'); f.write('HELLO CONSOLE'); f.close()")
# check the code has been executed by console, by checking the tmp file contents
with open(tmp_filename, "rt") as f:
assert f.read() == "HELLO CONSOLE"
# execute a sleep
t0 = time.perf_counter()
wait_prompt("import time; time.sleep(1)")
assert time.perf_counter() - t0 >= 1
# test ctrl-c
t0 = time.perf_counter()
wait_prompt("time.sleep(5)", busy=True)
wait_prompt(console_widget.send_ctrl_c)
assert (
time.perf_counter() - t0 < 1
) # in reality it will be almost immediate, but ok we can say less than 1 second compared to 5

View File

@@ -1,3 +1,5 @@
from __future__ import annotations
import json
from types import SimpleNamespace
from unittest import mock
@@ -7,6 +9,15 @@ import numpy as np
import pyqtgraph as pg
import pytest
from pyqtgraph.graphicsItems.DateAxisItem import DateAxisItem
from qtpy.QtCore import QTimer
from qtpy.QtWidgets import (
QApplication,
QCheckBox,
QDialog,
QDialogButtonBox,
QDoubleSpinBox,
QSpinBox,
)
from bec_widgets.widgets.plots.plot_base import UIMode
from bec_widgets.widgets.plots.waveform.curve import DeviceSignal
@@ -533,6 +544,7 @@ def test_on_async_readback_add_update(qtbot, mocked_client):
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
wf.scan_item = create_dummy_scan_item()
wf._scan_done = False # simulate a live scan
c = wf.plot(arg1="async_device", label="async_device-async_device")
wf._async_curves = [c]
# Suppose existing data
@@ -819,3 +831,227 @@ def test_show_dap_summary_popup(qtbot, mocked_client):
wf.dap_summary_dialog.close()
assert wf.dap_summary_dialog is None
assert fit_action.isChecked() is False
#####################################################
# The following tests are for the async dataset guard
#####################################################
def test_skip_large_dataset_warning_property(qtbot, mocked_client):
"""
Verify the getter and setter of skip_large_dataset_warning work correctly.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
# Default should be False
assert wf.skip_large_dataset_warning is False
# Set to True
wf.skip_large_dataset_warning = True
assert wf.skip_large_dataset_warning is True
# Toggle back to False
wf.skip_large_dataset_warning = False
assert wf.skip_large_dataset_warning is False
def test_max_dataset_size_mb_property(qtbot, mocked_client):
"""
Verify getter, setter, and validation of max_dataset_size_mb.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
# Default from WaveformConfig is 1 MB
assert wf.max_dataset_size_mb == 10
# Set to a valid new value
wf.max_dataset_size_mb = 5.5
assert wf.max_dataset_size_mb == 5.5
# Ensure the config is updated too
assert wf.config.max_dataset_size_mb == 5.5
def _dummy_dataset(mem_bytes: int, entry: str = "waveform_waveform"):
"""
Return an object that mimics the BEC dataset structure:
it has exactly one attribute `_info` with the expected layout.
"""
return SimpleNamespace(_info={entry: {"value": {"mem_size": mem_bytes}}})
def test_dataset_guard_under_limit(qtbot, mocked_client, monkeypatch):
"""
Dataset below the limit should load without triggering the dialog.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
wf.max_dataset_size_mb = 1 # 1 MiB
# If the dialog is called, we flip this flag it must stay False.
called = {"dlg": False}
monkeypatch.setattr(
Waveform, "_confirm_large_dataset", lambda self, size_mb: called.__setitem__("dlg", True)
)
dataset = _dummy_dataset(mem_bytes=512_000) # ≈0.49 MiB
assert wf._check_dataset_size_and_confirm(dataset, "waveform_waveform") is True
assert called["dlg"] is False
def test_dataset_guard_over_limit_accept(qtbot, mocked_client, monkeypatch):
"""
Dataset above the limit where user presses *Yes*.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
wf.max_dataset_size_mb = 1 # 1 MiB
# Pretend the user clicked “Yes”
monkeypatch.setattr(Waveform, "_confirm_large_dataset", lambda *_: True)
dataset = _dummy_dataset(mem_bytes=2_000_000) # ≈1.9 MiB
assert wf._check_dataset_size_and_confirm(dataset, "waveform_waveform") is True
def test_dataset_guard_over_limit_reject(qtbot, mocked_client, monkeypatch):
"""
Dataset above the limit where user presses *No*.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
wf.max_dataset_size_mb = 1 # 1 MiB
# Pretend the user clicked “No”
monkeypatch.setattr(Waveform, "_confirm_large_dataset", lambda *_: False)
dataset = _dummy_dataset(mem_bytes=2_000_000) # ≈1.9 MiB
assert wf._check_dataset_size_and_confirm(dataset, "waveform_waveform") is False
##################################################
# Dialog propagation behaviour
##################################################
def test_dialog_accept_updates_limit(monkeypatch, qtbot, mocked_client):
"""
Simulate clicking 'Yes' in the dialog *after* changing the spinner value.
Verify max_dataset_size_mb is updated and dataset loads.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
wf.max_dataset_size_mb = 1 # start small
def fake_confirm(self, size_mb):
# Simulate user typing '5' in the spinbox then pressing Yes
self.config.max_dataset_size_mb = 5
return True # Yes pressed
monkeypatch.setattr(Waveform, "_confirm_large_dataset", fake_confirm)
big_dataset = _dummy_dataset(mem_bytes=4_800_000) # ≈4.6 MiB
accepted = wf._check_dataset_size_and_confirm(big_dataset, "waveform_waveform")
# The load should be accepted and the limit must reflect the new value
assert accepted is True
assert wf.max_dataset_size_mb == 5
assert wf.config.max_dataset_size_mb == 5
def test_dialog_cancel_sets_skip(monkeypatch, qtbot, mocked_client):
"""
Simulate clicking 'No' but ticking 'Don't show again'.
Verify skip_large_dataset_warning becomes True and dataset is skipped.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
assert wf.skip_large_dataset_warning is False
def fake_confirm(self, size_mb):
# Mimic ticking the checkbox then pressing No
self._skip_large_dataset_warning = True
return False # No pressed
monkeypatch.setattr(Waveform, "_confirm_large_dataset", fake_confirm)
big_dataset = _dummy_dataset(mem_bytes=11_000_000)
accepted = wf._check_dataset_size_and_confirm(big_dataset, "waveform_waveform")
# Dataset must not load, but future warnings are suppressed
assert accepted is False
assert wf.skip_large_dataset_warning is True
##################################################
# Live dialog interaction (no monkeypatching)
##################################################
def _open_dialog_and_click(handler):
"""
Utility that schedules *handler* to run as soon as a modal
dialog is shown. Returns a function suitable for QTimer.singleShot.
"""
def _cb():
# Locate the active modal dialog
dlg = QApplication.activeModalWidget()
assert isinstance(dlg, QDialog), "No active modal dialog found"
handler(dlg)
return _cb
def test_dialog_accept_real_interaction(qtbot, mocked_client):
"""
Endtoend: user changes the limit spinner to 5 MiB, ticks
'don't show again', then presses YES.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
wf.max_dataset_size_mb = 1
# Prepare a large dataset (≈4.6MiB)
big_dataset = _dummy_dataset(mem_bytes=4_800_000)
def handler(dlg):
spin: QDoubleSpinBox = dlg.findChild(QDoubleSpinBox)
chk: QCheckBox = dlg.findChild(QCheckBox)
btns: QDialogButtonBox = dlg.findChild(QDialogButtonBox)
# # Interact with widgets
spin.setValue(5)
chk.setChecked(True)
yes_btn = btns.button(QDialogButtonBox.Yes)
yes_btn.click()
# Schedule the handler right before invoking the check
QTimer.singleShot(0, _open_dialog_and_click(handler))
accepted = wf._check_dataset_size_and_confirm(big_dataset, "waveform_waveform")
assert accepted is True
assert wf.max_dataset_size_mb == 5
assert wf.skip_large_dataset_warning is True
def test_dialog_reject_real_interaction(qtbot, mocked_client):
"""
Endtoend: user leaves spinner unchanged, ticks 'don't show again',
and presses NO.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
wf.max_dataset_size_mb = 1
big_dataset = _dummy_dataset(mem_bytes=4_800_000)
def handler(dlg):
chk: QCheckBox = dlg.findChild(QCheckBox)
btns: QDialogButtonBox = dlg.findChild(QDialogButtonBox)
chk.setChecked(True)
no_btn = btns.button(QDialogButtonBox.No)
no_btn.click()
QTimer.singleShot(0, _open_dialog_and_click(handler))
accepted = wf._check_dataset_size_and_confirm(big_dataset, "waveform_waveform")
assert accepted is False
assert wf.skip_large_dataset_warning is True
# Limit remains unchanged
assert wf.max_dataset_size_mb == 1