diff --git a/bec_widgets/widgets/editors/console/__init__.py b/bec_widgets/widgets/editors/console/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/bec_widgets/widgets/editors/console/console.py b/bec_widgets/widgets/editors/console/console.py deleted file mode 100644 index d1d0d390..00000000 --- a/bec_widgets/widgets/editors/console/console.py +++ /dev/null @@ -1,870 +0,0 @@ -""" -BECConsole is a Qt widget that runs a Bash shell. - -BECConsole VT100 emulation is powered by Pyte, -(https://github.com/selectel/pyte). -""" - -import collections -import fcntl -import html -import os -import pty -import re -import signal -import sys -import time - -import pyte -from pygments.token import Token -from pyte.screens import History -from qtpy import QtCore, QtGui, QtWidgets -from qtpy.QtCore import Property as pyqtProperty -from qtpy.QtCore import QSize, QSocketNotifier, Qt, QTimer -from qtpy.QtCore import Signal as pyqtSignal -from qtpy.QtGui import QClipboard, QColor, QPalette, QTextCursor -from qtpy.QtWidgets import QApplication, QHBoxLayout, QScrollBar, QSizePolicy - -from bec_widgets.utils.error_popups import SafeSlot as Slot - -ansi_colors = { - "black": "#000000", - "red": "#CD0000", - "green": "#00CD00", - "brown": "#996633", # Brown, replacing the yellow - "blue": "#0000EE", - "magenta": "#CD00CD", - "cyan": "#00CDCD", - "white": "#E5E5E5", - "brightblack": "#7F7F7F", - "brightred": "#FF0000", - "brightgreen": "#00FF00", - "brightyellow": "#FFFF00", - "brightblue": "#5C5CFF", - "brightmagenta": "#FF00FF", - "brightcyan": "#00FFFF", - "brightwhite": "#FFFFFF", -} - -control_keys_mapping = { - QtCore.Qt.Key_A: b"\x01", # Ctrl-A - QtCore.Qt.Key_B: b"\x02", # Ctrl-B - QtCore.Qt.Key_C: b"\x03", # Ctrl-C - QtCore.Qt.Key_D: b"\x04", # Ctrl-D - QtCore.Qt.Key_E: b"\x05", # Ctrl-E - QtCore.Qt.Key_F: b"\x06", # Ctrl-F - QtCore.Qt.Key_G: b"\x07", # Ctrl-G (Bell) - QtCore.Qt.Key_H: b"\x08", # Ctrl-H (Backspace) - QtCore.Qt.Key_I: b"\x09", # Ctrl-I (Tab) - QtCore.Qt.Key_J: b"\x0a", # Ctrl-J (Line Feed) - QtCore.Qt.Key_K: b"\x0b", # Ctrl-K (Vertical Tab) - QtCore.Qt.Key_L: b"\x0c", # Ctrl-L (Form Feed) - QtCore.Qt.Key_M: b"\x0d", # Ctrl-M (Carriage Return) - QtCore.Qt.Key_N: b"\x0e", # Ctrl-N - QtCore.Qt.Key_O: b"\x0f", # Ctrl-O - QtCore.Qt.Key_P: b"\x10", # Ctrl-P - QtCore.Qt.Key_Q: b"\x11", # Ctrl-Q - QtCore.Qt.Key_R: b"\x12", # Ctrl-R - QtCore.Qt.Key_S: b"\x13", # Ctrl-S - QtCore.Qt.Key_T: b"\x14", # Ctrl-T - QtCore.Qt.Key_U: b"\x15", # Ctrl-U - QtCore.Qt.Key_V: b"\x16", # Ctrl-V - QtCore.Qt.Key_W: b"\x17", # Ctrl-W - QtCore.Qt.Key_X: b"\x18", # Ctrl-X - QtCore.Qt.Key_Y: b"\x19", # Ctrl-Y - QtCore.Qt.Key_Z: b"\x1a", # Ctrl-Z - QtCore.Qt.Key_Escape: b"\x1b", # Ctrl-Escape - QtCore.Qt.Key_Backslash: b"\x1c", # Ctrl-\ - QtCore.Qt.Key_Underscore: b"\x1f", # Ctrl-_ -} - -normal_keys_mapping = { - QtCore.Qt.Key_Return: b"\n", - QtCore.Qt.Key_Space: b" ", - QtCore.Qt.Key_Enter: b"\n", - QtCore.Qt.Key_Tab: b"\t", - QtCore.Qt.Key_Backspace: b"\x08", - QtCore.Qt.Key_Home: b"\x47", - QtCore.Qt.Key_End: b"\x4f", - QtCore.Qt.Key_Left: b"\x02", - QtCore.Qt.Key_Up: b"\x10", - QtCore.Qt.Key_Right: b"\x06", - QtCore.Qt.Key_Down: b"\x0e", - QtCore.Qt.Key_PageUp: b"\x49", - QtCore.Qt.Key_PageDown: b"\x51", - QtCore.Qt.Key_F1: b"\x1b\x31", - QtCore.Qt.Key_F2: b"\x1b\x32", - QtCore.Qt.Key_F3: b"\x1b\x33", - QtCore.Qt.Key_F4: b"\x1b\x34", - QtCore.Qt.Key_F5: b"\x1b\x35", - QtCore.Qt.Key_F6: b"\x1b\x36", - QtCore.Qt.Key_F7: b"\x1b\x37", - QtCore.Qt.Key_F8: b"\x1b\x38", - QtCore.Qt.Key_F9: b"\x1b\x39", - QtCore.Qt.Key_F10: b"\x1b\x30", - QtCore.Qt.Key_F11: b"\x45", - QtCore.Qt.Key_F12: b"\x46", -} - - -def QtKeyToAscii(event): - """ - Convert the Qt key event to the corresponding ASCII sequence for - the terminal. This works fine for standard alphanumerical characters, but - most other characters require terminal specific control sequences. - - The conversion below works for TERM="linux" terminals. - """ - if sys.platform == "darwin": - # special case for MacOS - # /!\ Qt maps ControlModifier to CMD - # CMD-C, CMD-V for copy/paste - # CTRL-C and other modifiers -> key mapping - if event.modifiers() == QtCore.Qt.MetaModifier: - if event.key() == Qt.Key_Backspace: - return control_keys_mapping.get(Qt.Key_W) - return control_keys_mapping.get(event.key()) - elif event.modifiers() == QtCore.Qt.ControlModifier: - if event.key() == Qt.Key_C: - # copy - return "copy" - elif event.key() == Qt.Key_V: - # paste - return "paste" - return None - else: - return normal_keys_mapping.get(event.key(), event.text().encode("utf8")) - if event.modifiers() == QtCore.Qt.ControlModifier: - return control_keys_mapping.get(event.key()) - else: - return normal_keys_mapping.get(event.key(), event.text().encode("utf8")) - - -class Screen(pyte.HistoryScreen): - def __init__(self, stdin_fd, cols, rows, historyLength): - super().__init__(cols, rows, historyLength, ratio=1 / rows) - self._fd = stdin_fd - - def write_process_input(self, data): - """Response to CPR request (for example), - this can be for other requests - """ - try: - os.write(self._fd, data.encode("utf-8")) - except (IOError, OSError): - pass - - def resize(self, lines, columns): - lines = lines or self.lines - columns = columns or self.columns - - if lines == self.lines and columns == self.columns: - return # No changes. - - self.dirty.clear() - self.dirty.update(range(lines)) - - self.save_cursor() - if lines < self.lines: - if lines <= self.cursor.y: - nlines_to_move_up = self.lines - lines - for i in range(nlines_to_move_up): - line = self.buffer[i] # .pop(0) - self.history.top.append(line) - self.cursor_position(0, 0) - self.delete_lines(nlines_to_move_up) - self.restore_cursor() - self.cursor.y -= nlines_to_move_up - else: - self.restore_cursor() - - self.lines, self.columns = lines, columns - self.history = History( - self.history.top, - self.history.bottom, - 1 / self.lines, - self.history.size, - self.history.position, - ) - self.set_margins() - - -class Backend(QtCore.QObject): - """ - Poll Bash. - - This class will run as a qsocketnotifier (started in ``_TerminalWidget``) and poll the - file descriptor of the Bash terminal. - """ - - # Signals to communicate with ``_TerminalWidget``. - dataReady = pyqtSignal(object) - processExited = pyqtSignal() - - def __init__(self, fd, cols, rows): - super().__init__() - - # File descriptor that connects to Bash process. - self.fd = fd - - # Setup Pyte (hard coded display size for now). - self.screen = Screen(self.fd, cols, rows, 10000) - self.stream = pyte.ByteStream() - self.stream.attach(self.screen) - - self.notifier = QSocketNotifier(fd, QSocketNotifier.Read) - self.notifier.activated.connect(self._fd_readable) - - def _fd_readable(self): - """ - Poll the Bash output, run it through Pyte, and notify - """ - # Read the shell output until the file descriptor is closed. - try: - out = os.read(self.fd, 2**16) - except OSError: - self.processExited.emit() - self.notifier.setEnabled(False) - return - - # Feed output into Pyte's state machine and send the new screen - # output to the GUI - self.stream.feed(out) - self.dataReady.emit(self.screen) - - -class BECConsole(QtWidgets.QWidget): - """Container widget for the terminal text area""" - - PLUGIN = True - ICON_NAME = "terminal" - - prompt = pyqtSignal(bool) - - def __init__(self, parent=None, cols=132): - super().__init__(parent) - - self.term = _TerminalWidget(self, cols, rows=43) - self.term.prompt.connect(self.prompt) # forward signal from term to this widget - - self.scroll_bar = QScrollBar(Qt.Vertical, self) - # self.scroll_bar.hide() - layout = QHBoxLayout(self) - layout.addWidget(self.term) - layout.addWidget(self.scroll_bar) - layout.setAlignment(Qt.AlignLeft | Qt.AlignTop) - layout.setContentsMargins(0, 0, 0, 0) - self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Expanding) - - pal = QPalette() - self.set_bgcolor(pal.window().color()) - self.set_fgcolor(pal.windowText().color()) - self.term.set_scroll_bar(self.scroll_bar) - self.set_cmd("bec --nogui") - - self._check_designer_timer = QTimer() - self._check_designer_timer.timeout.connect(self.check_designer) - self._check_designer_timer.start(1000) - - def minimumSizeHint(self): - size = self.term.sizeHint() - size.setWidth(size.width() + self.scroll_bar.width()) - return size - - def sizeHint(self): - return self.minimumSizeHint() - - def check_designer(self, calls={"n": 0}): - calls["n"] += 1 - if self.term.fd is not None: - # already started - self._check_designer_timer.stop() - elif self.window().windowTitle().endswith("[Preview]"): - # assuming Designer preview -> start - self._check_designer_timer.stop() - self.term.start() - elif calls["n"] >= 3: - # assuming not in Designer -> stop checking - self._check_designer_timer.stop() - - def get_rows(self): - return self.term.rows - - def set_rows(self, rows): - self.term.rows = rows - self.adjustSize() - self.updateGeometry() - - def get_cols(self): - return self.term.cols - - def set_cols(self, cols): - self.term.cols = cols - self.adjustSize() - self.updateGeometry() - - def get_bgcolor(self): - return QColor.fromString(self.term.bg_color) - - def set_bgcolor(self, color): - self.term.bg_color = color.name(QColor.HexRgb) - - def get_fgcolor(self): - return QColor.fromString(self.term.fg_color) - - def set_fgcolor(self, color): - self.term.fg_color = color.name(QColor.HexRgb) - - def get_cmd(self): - return self.term._cmd - - def set_cmd(self, cmd): - self.term._cmd = cmd - if self.term.fd is None: - # not started yet - self.term.clear() - self.term.appendHtml(f"

BEC Console - {repr(cmd)}

") - - def start(self, deactivate_ctrl_d=True): - self.term.start(deactivate_ctrl_d=deactivate_ctrl_d) - - def push(self, text, hit_return=False): - """Push some text to the terminal""" - return self.term.push(text, hit_return=hit_return) - - def execute_command(self, command): - self.push(command, hit_return=True) - - def set_prompt_tokens(self, *tokens): - """Prepare regexp to identify prompt, based on tokens - - Tokens are returned from get_ipython().prompts.in_prompt_tokens() - """ - regex_parts = [] - for token_type, token_value in tokens: - if token_type == Token.PromptNum: # Handle dynamic prompt number - regex_parts.append(r"[\d\?]+") # Match one or more digits or '?' - else: - # Escape other prompt parts (e.g., "In [", "]: ") - if not token_value: - regex_parts.append(".+?") # arbitrary string - else: - regex_parts.append(re.escape(token_value)) - - # Combine into a single regex - prompt_pattern = "".join(regex_parts) - self.term._prompt_re = re.compile(prompt_pattern + r"\s*$") - - def terminate(self, timeout=10): - self.term.stop(timeout=timeout) - - def send_ctrl_c(self, timeout=None): - self.term.send_ctrl_c(timeout) - - cols = pyqtProperty(int, get_cols, set_cols) - rows = pyqtProperty(int, get_rows, set_rows) - bgcolor = pyqtProperty(QColor, get_bgcolor, set_bgcolor) - fgcolor = pyqtProperty(QColor, get_fgcolor, set_fgcolor) - cmd = pyqtProperty(str, get_cmd, set_cmd) - - -class _TerminalWidget(QtWidgets.QPlainTextEdit): - """ - Start ``Backend`` process and render Pyte output as text. - """ - - prompt = pyqtSignal(bool) - - def __init__(self, parent, cols=125, rows=50, **kwargs): - # regexp to match prompt - self._prompt_re = None - # last prompt - self._prompt_str = None - # process pid - self.pid = None - # file descriptor to communicate with the subprocess - self.fd = None - self.backend = None - # command to execute - self._cmd = "" - # should ctrl-d be deactivated ? (prevent Python exit) - self._deactivate_ctrl_d = False - - # Default colors - pal = QPalette() - self._fg_color = pal.text().color().name() - self._bg_color = pal.base().color().name() - - # Specify the terminal size in terms of lines and columns. - self._rows = rows - self._cols = cols - self.output = collections.deque() - - super().__init__(parent) - - self.setSizePolicy(QSizePolicy.MinimumExpanding, QSizePolicy.Expanding) - - # Disable default scrollbars (we use our own, to be set via .set_scroll_bar()) - self.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOff) - self.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff) - self.scroll_bar = None - - # Use Monospace fonts and disable line wrapping. - self.setFont(QtGui.QFont("Courier", 9)) - self.setFont(QtGui.QFont("Monospace")) - self.setLineWrapMode(QtWidgets.QPlainTextEdit.NoWrap) - fmt = QtGui.QFontMetrics(self.font()) - char_width = fmt.width("w") - self.setCursorWidth(char_width) - - self.adjustSize() - self.updateGeometry() - self.update_stylesheet() - - @property - def bg_color(self): - return self._bg_color - - @bg_color.setter - def bg_color(self, hexcolor): - self._bg_color = hexcolor - self.update_stylesheet() - - @property - def fg_color(self): - return self._fg_color - - @fg_color.setter - def fg_color(self, hexcolor): - self._fg_color = hexcolor - self.update_stylesheet() - - def update_stylesheet(self): - self.setStyleSheet( - f"QPlainTextEdit {{ border: 0; color: {self._fg_color}; background-color: {self._bg_color}; }} " - ) - - @property - def rows(self): - return self._rows - - @rows.setter - def rows(self, rows: int): - if self.backend is None: - # not initialized yet, ok to change - self._rows = rows - self.adjustSize() - self.updateGeometry() - else: - raise RuntimeError("Cannot change rows after console is started.") - - @property - def cols(self): - return self._cols - - @cols.setter - def cols(self, cols: int): - if self.fd is None: - # not initialized yet, ok to change - self._cols = cols - self.adjustSize() - self.updateGeometry() - else: - raise RuntimeError("Cannot change cols after console is started.") - - def start(self, deactivate_ctrl_d: bool = False): - self._deactivate_ctrl_d = deactivate_ctrl_d - - self.update_term_size() - - # Start the Bash process - self.pid, self.fd = self.fork_shell() - - if self.fd: - # Create the ``Backend`` object - self.backend = Backend(self.fd, self.cols, self.rows) - self.backend.dataReady.connect(self.data_ready) - self.backend.processExited.connect(self.process_exited) - else: - self.process_exited() - - def process_exited(self): - self.fd = None - self.clear() - self.appendHtml(f"

{repr(self._cmd)} - Process exited.

") - self.setReadOnly(True) - - def send_ctrl_c(self, wait_prompt=True, timeout=None): - """Send CTRL-C to the process - - If wait_prompt=True (default), wait for a new prompt after CTRL-C - If no prompt is displayed after 'timeout' seconds, TimeoutError is raised - """ - os.kill(self.pid, signal.SIGINT) - if wait_prompt: - timeout_error = False - if timeout: - - def set_timeout_error(): - nonlocal timeout_error - timeout_error = True - - timeout_timer = QTimer() - timeout_timer.singleShot(timeout * 1000, set_timeout_error) - while self._prompt_str is None: - QApplication.instance().process_events() - if timeout_error: - raise TimeoutError( - f"CTRL-C: could not get back to prompt after {timeout} seconds." - ) - - def _is_running(self): - if os.waitpid(self.pid, os.WNOHANG) == (0, 0): - return True - return False - - def stop(self, kill=True, timeout=None): - """Stop the running process - - SIGTERM is the default signal for terminating processes. - - If kill=True (default), SIGKILL will be sent if the process does not exit after timeout - """ - # try to exit gracefully - os.kill(self.pid, signal.SIGTERM) - - # wait until process is truly dead - t0 = time.perf_counter() - while self._is_running(): - time.sleep(1) - if timeout is not None and time.perf_counter() - t0 > timeout: - # still alive after 'timeout' seconds - if kill: - # send SIGKILL and make a last check in loop - os.kill(self.pid, signal.SIGKILL) - kill = False - else: - # still running after timeout... - raise TimeoutError( - f"Could not terminate process with pid: {self.pid} within timeout" - ) - self.process_exited() - - def data_ready(self, screen): - """Handle new screen: redraw, set scroll bar max and slider, move cursor to its position - - This method is triggered via a signal from ``Backend``. - """ - self.redraw_screen() - self.adjust_scroll_bar() - self.move_cursor() - - def minimumSizeHint(self): - """Return minimum size for current cols and rows""" - fmt = QtGui.QFontMetrics(self.font()) - char_width = fmt.width("w") - char_height = fmt.height() - width = char_width * self.cols - height = char_height * self.rows - return QSize(width, height) - - def sizeHint(self): - return self.minimumSizeHint() - - def set_scroll_bar(self, scroll_bar): - self.scroll_bar = scroll_bar - self.scroll_bar.setMinimum(0) - self.scroll_bar.valueChanged.connect(self.scroll_value_change) - - def scroll_value_change(self, value, old={"value": -1}): - if self.backend is None: - return - if old["value"] == -1: - old["value"] = self.scroll_bar.maximum() - if value <= old["value"]: - # scroll up - # value is number of lines from the start - nlines = old["value"] - value - # history ratio gives prev_page == 1 line - for i in range(nlines): - self.backend.screen.prev_page() - else: - # scroll down - nlines = value - old["value"] - for i in range(nlines): - self.backend.screen.next_page() - old["value"] = value - self.redraw_screen() - - def adjust_scroll_bar(self): - sb = self.scroll_bar - sb.valueChanged.disconnect(self.scroll_value_change) - tmp = len(self.backend.screen.history.top) + len(self.backend.screen.history.bottom) - sb.setMaximum(tmp if tmp > 0 else 0) - sb.setSliderPosition(tmp if tmp > 0 else 0) - # if tmp > 0: - # # show scrollbar, but delayed - prevent recursion with widget size change - # QTimer.singleShot(0, scrollbar.show) - # else: - # QTimer.singleShot(0, scrollbar.hide) - sb.valueChanged.connect(self.scroll_value_change) - - def write(self, data): - try: - os.write(self.fd, data) - except (IOError, OSError): - self.process_exited() - - @Slot(object) - def keyPressEvent(self, event): - """ - Redirect all keystrokes to the terminal process. - """ - if self.fd is None: - # not started - return - # Convert the Qt key to the correct ASCII code. - if ( - self._deactivate_ctrl_d - and event.modifiers() == QtCore.Qt.ControlModifier - and event.key() == QtCore.Qt.Key_D - ): - return None - - code = QtKeyToAscii(event) - if code == "copy": - # MacOS only: CMD-C handling - self.copy() - elif code == "paste": - # MacOS only: CMD-V handling - self._push_clipboard() - elif code is not None: - self.write(code) - - def push(self, text, hit_return=False): - """ - Write 'text' to terminal - """ - self.write(text.encode("utf-8")) - if hit_return: - self.write(b"\n") - - def contextMenuEvent(self, event): - if self.fd is None: - return - menu = self.createStandardContextMenu() - for action in menu.actions(): - # remove all actions except copy and paste - if "opy" in action.text(): - # redefine text without shortcut - # since it probably clashes with control codes (like CTRL-C etc) - action.setText("Copy") - continue - if "aste" in action.text(): - # redefine text without shortcut - action.setText("Paste") - # paste -> have to insert with self.push - action.triggered.connect(self._push_clipboard) - continue - menu.removeAction(action) - menu.exec_(event.globalPos()) - - def _push_clipboard(self): - clipboard = QApplication.instance().clipboard() - self.push(clipboard.text()) - - def move_cursor(self): - textCursor = self.textCursor() - textCursor.setPosition(0) - textCursor.movePosition( - QTextCursor.Down, QTextCursor.MoveAnchor, self.backend.screen.cursor.y - ) - textCursor.movePosition( - QTextCursor.Right, QTextCursor.MoveAnchor, self.backend.screen.cursor.x - ) - self.setTextCursor(textCursor) - - def mouseReleaseEvent(self, event): - if self.fd is None: - return - if event.button() == Qt.MiddleButton: - # push primary selection buffer ("mouse clipboard") to terminal - clipboard = QApplication.instance().clipboard() - if clipboard.supportsSelection(): - self.push(clipboard.text(QClipboard.Selection)) - return None - elif event.button() == Qt.LeftButton: - # left button click - textCursor = self.textCursor() - if textCursor.selectedText(): - # mouse was used to select text -> nothing to do - pass - else: - # a simple 'click', move scrollbar to end - self.scroll_bar.setSliderPosition(self.scroll_bar.maximum()) - self.move_cursor() - return None - return super().mouseReleaseEvent(event) - - def redraw_screen(self): - """ - Render the screen as formatted text into the widget. - """ - screen = self.backend.screen - - # Clear the widget - if screen.dirty: - self.clear() - while len(self.output) < (max(screen.dirty) + 1): - self.output.append("") - while len(self.output) > (max(screen.dirty) + 1): - self.output.pop() - - # Prepare the HTML output - for line_no in screen.dirty: - line = text = "" - style = old_style = "" - old_idx = 0 - for idx, ch in screen.buffer[line_no].items(): - text += " " * (idx - old_idx - 1) - old_idx = idx - style = f"{'background-color:%s;' % ansi_colors.get(ch.bg, ansi_colors['black']) if ch.bg!='default' else ''}{'color:%s;' % ansi_colors.get(ch.fg, ansi_colors['white']) if ch.fg!='default' else ''}{'font-weight:bold;' if ch.bold else ''}{'font-style:italic;' if ch.italics else ''}" - if style != old_style: - if old_style: - line += f"{html.escape(text, quote=True)}" - else: - line += html.escape(text, quote=True) - text = "" - old_style = style - text += ch.data - if style: - line += f"{html.escape(text, quote=True)}" - else: - line += html.escape(text, quote=True) - # do a check at the cursor position: - # it is possible x pos > output line length, - # for example if last escape codes are "cursor forward" past end of text, - # like IPython does for "..." prompt (in a block, like "for" loop or "while" for example) - # In this case, cursor is at 12 but last text output is at 8 -> insert spaces - if line_no == screen.cursor.y: - llen = len(screen.buffer[line_no]) - if llen < screen.cursor.x: - line += " " * (screen.cursor.x - llen) - self.output[line_no] = line - # fill the text area with HTML contents in one go - self.appendHtml(f"
{chr(10).join(self.output)}
") - - if self._prompt_re is not None: - text_buf = self.toPlainText() - prompt = self._prompt_re.search(text_buf) - if prompt is None: - if self._prompt_str: - self.prompt.emit(False) - self._prompt_str = None - else: - prompt_str = prompt.string.rstrip() - if prompt_str != self._prompt_str: - self._prompt_str = prompt_str - self.prompt.emit(True) - - # did updates, all clean - screen.dirty.clear() - - def update_term_size(self): - fmt = QtGui.QFontMetrics(self.font()) - char_width = fmt.width("w") - char_height = fmt.height() - self._cols = int(self.width() / char_width) - self._rows = int(self.height() / char_height) - - def resizeEvent(self, event): - self.update_term_size() - if self.fd: - self.backend.screen.resize(self._rows, self._cols) - self.redraw_screen() - self.adjust_scroll_bar() - self.move_cursor() - - def wheelEvent(self, event): - if not self.fd: - return - y = event.angleDelta().y() - if y > 0: - self.backend.screen.prev_page() - else: - self.backend.screen.next_page() - self.redraw_screen() - - def fork_shell(self): - """ - Fork the current process and execute bec in shell. - """ - try: - pid, fd = pty.fork() - except (IOError, OSError): - return False - if pid == 0: - try: - ls = os.environ["LANG"].split(".") - except KeyError: - ls = [] - if len(ls) < 2: - ls = ["en_US", "UTF-8"] - os.putenv("COLUMNS", str(self.cols)) - os.putenv("LINES", str(self.rows)) - os.putenv("TERM", "linux") - os.putenv("LANG", ls[0] + ".UTF-8") - if not self._cmd: - self._cmd = os.environ["SHELL"] - cmd = self._cmd - if isinstance(cmd, str): - cmd = cmd.split() - try: - os.execvp(cmd[0], cmd) - except (IOError, OSError): - pass - os._exit(0) - else: - # We are in the parent process. - # Set file control - fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK) - return pid, fd - - -if __name__ == "__main__": - import os - import sys - - from qtpy import QtGui, QtWidgets - - # Create the Qt application and console. - app = QtWidgets.QApplication([]) - mainwin = QtWidgets.QMainWindow() - title = "BECConsole" - mainwin.setWindowTitle(title) - - console = BECConsole(mainwin) - mainwin.setCentralWidget(console) - - def check_prompt(at_prompt): - if at_prompt: - print("NEW PROMPT") - else: - print("EXECUTING SOMETHING...") - - console.set_prompt_tokens( - (Token.OutPromptNum, "•"), - (Token.Prompt, ""), # will match arbitrary string, - (Token.Prompt, " ["), - (Token.PromptNum, "3"), - (Token.Prompt, "/"), - (Token.PromptNum, "1"), - (Token.Prompt, "] "), - (Token.Prompt, "❯❯"), - ) - console.prompt.connect(check_prompt) - console.start() - - # Show widget and launch Qt's event loop. - mainwin.show() - sys.exit(app.exec_()) diff --git a/bec_widgets/widgets/editors/console/console.pyproject b/bec_widgets/widgets/editors/console/console.pyproject deleted file mode 100644 index 628b4e99..00000000 --- a/bec_widgets/widgets/editors/console/console.pyproject +++ /dev/null @@ -1 +0,0 @@ -{'files': ['console.py']} diff --git a/bec_widgets/widgets/editors/console/console_plugin.py b/bec_widgets/widgets/editors/console/console_plugin.py deleted file mode 100644 index eec5a993..00000000 --- a/bec_widgets/widgets/editors/console/console_plugin.py +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright (C) 2022 The Qt Company Ltd. -# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause -import os - -from qtpy.QtDesigner import QDesignerCustomWidgetInterface - -import bec_widgets -from bec_widgets.utils.bec_designer import designer_material_icon -from bec_widgets.widgets.editors.console.console import BECConsole - -DOM_XML = """ - - - - -""" - -MODULE_PATH = os.path.dirname(bec_widgets.__file__) - - -class BECConsolePlugin(QDesignerCustomWidgetInterface): # pragma: no cover - def __init__(self): - super().__init__() - self._form_editor = None - - def createWidget(self, parent): - t = BECConsole(parent) - return t - - def domXml(self): - return DOM_XML - - def group(self): - return "BEC Console" - - def icon(self): - return designer_material_icon(BECConsole.ICON_NAME) - - def includeFile(self): - return "bec_console" - - def initialize(self, form_editor): - self._form_editor = form_editor - - def isContainer(self): - return False - - def isInitialized(self): - return self._form_editor is not None - - def name(self): - return "BECConsole" - - def toolTip(self): - return "A terminal-like vt100 widget." - - def whatsThis(self): - return self.toolTip() diff --git a/bec_widgets/widgets/editors/console/register_console.py b/bec_widgets/widgets/editors/console/register_console.py deleted file mode 100644 index f25026f6..00000000 --- a/bec_widgets/widgets/editors/console/register_console.py +++ /dev/null @@ -1,15 +0,0 @@ -def main(): # pragma: no cover - from qtpy import PYSIDE6 - - if not PYSIDE6: - print("PYSIDE6 is not available in the environment. Cannot patch designer.") - return - from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection - - from bec_widgets.widgets.editors.console.console_plugin import BECConsolePlugin - - QPyDesignerCustomWidgetCollection.addCustomWidget(BECConsolePlugin()) - - -if __name__ == "__main__": # pragma: no cover - main() diff --git a/tests/unit_tests/test_console.py b/tests/unit_tests/test_console.py deleted file mode 100644 index c93d1c87..00000000 --- a/tests/unit_tests/test_console.py +++ /dev/null @@ -1,65 +0,0 @@ -import sys -import threading -import time - -import pytest -from pygments.token import Token -from qtpy.QtCore import QEventLoop - -from bec_widgets.utils.colors import apply_theme -from bec_widgets.widgets.editors.console.console import BECConsole - - -@pytest.fixture -def console_widget(qtbot): - apply_theme("light") - console = BECConsole() - console.set_cmd(sys.executable) # will launch Python interpreter - console.set_prompt_tokens((Token.Prompt, ">>>")) - qtbot.addWidget(console) - console.show() - qtbot.waitExposed(console) - yield console - console.terminate() - - -def test_console_widget(console_widget, qtbot, tmp_path): - def wait_prompt(command_to_execute=None, busy=False): - signal_waiter = QEventLoop() - - def exit_loop(idle): - if busy and not idle: - signal_waiter.quit() - elif not busy and idle: - signal_waiter.quit() - - console_widget.prompt.connect(exit_loop) - if command_to_execute: - if callable(command_to_execute): - command_to_execute() - else: - console_widget.execute_command(command_to_execute) - signal_waiter.exec_() - - console_widget.start() - wait_prompt() - - # use console to write something to a tmp file - tmp_filename = str(tmp_path / "console_test.txt") - wait_prompt(f"f = open('{tmp_filename}', 'wt'); f.write('HELLO CONSOLE'); f.close()") - # check the code has been executed by console, by checking the tmp file contents - with open(tmp_filename, "rt") as f: - assert f.read() == "HELLO CONSOLE" - - # execute a sleep - t0 = time.perf_counter() - wait_prompt("import time; time.sleep(1)") - assert time.perf_counter() - t0 >= 1 - - # test ctrl-c - t0 = time.perf_counter() - wait_prompt("time.sleep(5)", busy=True) - wait_prompt(console_widget.send_ctrl_c) - assert ( - time.perf_counter() - t0 < 1 - ) # in reality it will be almost immediate, but ok we can say less than 1 second compared to 5