1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-15 13:10:54 +02:00

Compare commits

...

13 Commits

Author SHA1 Message Date
semantic-release
6d13a3283b 0.49.1
Automatically generated by python-semantic-release
2024-04-26 16:17:32 +00:00
ab8537483d fix(widgets/editor): qscintilla editor removed 2024-04-26 17:57:54 +02:00
a22229849c build(pyqt6): fixing PyQt6-Qt6 package to 6.6.3 2024-04-25 17:07:29 +02:00
semantic-release
1ba266080c 0.49.0
Automatically generated by python-semantic-release
2024-04-24 15:57:14 +00:00
6500a00682 feat(rpc/client_utils): timeout for rpc response 2024-04-24 17:49:23 +02:00
9602085f82 fix(rpc/client_utils): close clean up policy for BECFigure 2024-04-24 10:54:24 +02:00
semantic-release
a1c369de9b 0.48.0
Automatically generated by python-semantic-release
2024-04-24 05:29:44 +00:00
6238693ffb feat(cli): added auto updates plugin support 2024-04-23 15:22:45 +02:00
semantic-release
f3a387e77f 0.47.0
Automatically generated by python-semantic-release
2024-04-23 13:12:39 +00:00
71cb80d544 feat(utils/thread_checker): util class to check the thread leakage for closeEvent in qt 2024-04-23 14:53:13 +02:00
77ff7962cc refactor(utils/container_utils): part of the logic regarding locating widgets moved from BECFigure to utility class 2024-04-22 12:07:37 +02:00
semantic-release
a516b1b247 0.46.7
Automatically generated by python-semantic-release
2024-04-21 16:24:50 +00:00
67a99a1a19 fix(plot/image): monitors are now validated with current bec session 2024-04-20 01:35:17 +02:00
19 changed files with 360 additions and 683 deletions

View File

@@ -2,6 +2,40 @@
<!--next-version-placeholder-->
## v0.49.1 (2024-04-26)
### Fix
* **widgets/editor:** Qscintilla editor removed ([`ab85374`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/ab8537483da6c87cb9a0b0f01706208c964f292d))
## v0.49.0 (2024-04-24)
### Feature
* **rpc/client_utils:** Timeout for rpc response ([`6500a00`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/6500a00682a2a7ca535a138bd9496ed8470856a8))
### Fix
* **rpc/client_utils:** Close clean up policy for BECFigure ([`9602085`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/9602085f82cbc983f89b5bfe48bf35f08438fa87))
## v0.48.0 (2024-04-24)
### Feature
* **cli:** Added auto updates plugin support ([`6238693`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/6238693ffb44b47a56b969bc4129f2af7a2c04fe))
## v0.47.0 (2024-04-23)
### Feature
* **utils/thread_checker:** Util class to check the thread leakage for closeEvent in qt ([`71cb80d`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/71cb80d544c5f4ef499379a431ce0c17907c7ce8))
## v0.46.7 (2024-04-21)
### Fix
* **plot/image:** Monitors are now validated with current bec session ([`67a99a1`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/67a99a1a19c261f9a1f09635f274cd9fbfe53639))
## v0.46.6 (2024-04-19)
### Fix

View File

@@ -1 +1,2 @@
from .auto_updates import AutoUpdates, ScanInfo
from .client import BECFigure

View File

@@ -0,0 +1,118 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from pydantic import BaseModel
if TYPE_CHECKING:
from .client import BECFigure
class ScanInfo(BaseModel):
scan_id: str
scan_number: int
scan_name: str
scan_report_devices: list
monitored_devices: list
status: str
class AutoUpdates:
def __init__(self, figure: BECFigure, enabled: bool = True):
self.enabled = enabled
self.figure = figure
@staticmethod
def get_scan_info(msg) -> ScanInfo:
"""
Update the script with the given data.
"""
info = msg.info
status = msg.status
scan_id = msg.scan_id
scan_number = info.get("scan_number", 0)
scan_name = info.get("scan_name", "Unknown")
scan_report_devices = info.get("scan_report_devices", [])
monitored_devices = info.get("readout_priority", {}).get("monitored", [])
monitored_devices = [dev for dev in monitored_devices if dev not in scan_report_devices]
return ScanInfo(
scan_id=scan_id,
scan_number=scan_number,
scan_name=scan_name,
scan_report_devices=scan_report_devices,
monitored_devices=monitored_devices,
status=status,
)
def run(self, msg):
"""
Run the update function if enabled.
"""
if not self.enabled:
return
if msg.status != "open":
return
info = self.get_scan_info(msg)
self.handler(info)
@staticmethod
def get_selected_device(monitored_devices, selected_device):
"""
Get the selected device for the plot. If no device is selected, the first
device in the monitored devices list is selected.
"""
if selected_device:
return selected_device
if len(monitored_devices) > 0:
sel_device = monitored_devices[0]
return sel_device
return None
def handler(self, info: ScanInfo) -> None:
"""
Default update function.
"""
if info.scan_name == "line_scan" and info.scan_report_devices:
self.simple_line_scan(info)
return
if info.scan_name == "grid_scan" and info.scan_report_devices:
self.simple_grid_scan(info)
return
if info.scan_report_devices:
self.best_effort(info)
return
def simple_line_scan(self, info: ScanInfo) -> None:
"""
Simple line scan.
"""
dev_x = info.scan_report_devices[0]
dev_y = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
if not dev_y:
return
self.figure.clear_all()
plt = self.figure.plot(dev_x, dev_y)
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def simple_grid_scan(self, info: ScanInfo) -> None:
"""
Simple grid scan.
"""
dev_x = info.scan_report_devices[0]
dev_y = info.scan_report_devices[1]
dev_z = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
self.figure.clear_all()
plt = self.figure.plot(dev_x, dev_y, dev_z, label=f"Scan {info.scan_number}")
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def best_effort(self, info: ScanInfo) -> None:
"""
Best effort scan.
"""
dev_x = info.scan_report_devices[0]
dev_y = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
if not dev_y:
return
self.figure.clear_all()
plt = self.figure.plot(dev_x, dev_y, label=f"Scan {info.scan_number}")
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.6 KiB

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import importlib
import importlib.metadata as imd
import os
import select
import subprocess
@@ -17,6 +18,7 @@ from bec_lib.device import DeviceBase
from qtpy.QtCore import QCoreApplication
import bec_widgets.cli.client as client
from bec_widgets.cli.auto_updates import AutoUpdates
from bec_widgets.utils.bec_dispatcher import BECDispatcher
if TYPE_CHECKING:
@@ -54,68 +56,22 @@ def rpc_call(func):
return wrapper
def get_selected_device(monitored_devices, selected_device):
"""
Get the selected device for the plot. If no device is selected, the first
device in the monitored devices list is selected.
"""
if selected_device:
return selected_device
if len(monitored_devices) > 0:
sel_device = monitored_devices[0]
return sel_device
return None
def update_script(figure: BECFigure, msg):
"""
Update the script with the given data.
"""
info = msg.info
status = msg.status
scan_id = msg.scan_id
scan_number = info.get("scan_number", 0)
scan_name = info.get("scan_name", "Unknown")
scan_report_devices = info.get("scan_report_devices", [])
monitored_devices = info.get("readout_priority", {}).get("monitored", [])
monitored_devices = [dev for dev in monitored_devices if dev not in scan_report_devices]
if scan_name == "line_scan" and scan_report_devices:
dev_x = scan_report_devices[0]
dev_y = get_selected_device(monitored_devices, figure.selected_device)
print(f"Selected device: {dev_y}")
if not dev_y:
return
figure.clear_all()
plt = figure.plot(dev_x, dev_y)
plt.set(title=f"Scan {scan_number}", x_label=dev_x, y_label=dev_y)
elif scan_name == "grid_scan" and scan_report_devices:
print(f"Scan {scan_number} is running")
dev_x = scan_report_devices[0]
dev_y = scan_report_devices[1]
dev_z = get_selected_device(monitored_devices, figure.selected_device)
figure.clear_all()
plt = figure.plot(dev_x, dev_y, dev_z, label=f"Scan {scan_number}")
plt.set(title=f"Scan {scan_number}", x_label=dev_x, y_label=dev_y)
elif scan_report_devices:
dev_x = scan_report_devices[0]
dev_y = get_selected_device(monitored_devices, figure.selected_device)
if not dev_y:
return
figure.clear_all()
plt = figure.plot(dev_x, dev_y, label=f"Scan {scan_number}")
plt.set(title=f"Scan {scan_number}", x_label=dev_x, y_label=dev_y)
class BECFigureClientMixin:
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._process = None
self.update_script = update_script
self.update_script = self._get_update_script()
self._target_endpoint = MessageEndpoints.scan_status()
self._selected_device = None
self.stderr_output = []
def _get_update_script(self) -> AutoUpdates:
eps = imd.entry_points(group="bec.widgets.auto_updates")
for ep in eps:
if ep.name == "plugin_widgets_update":
return ep.load()(figure=self)
return None
@property
def selected_device(self):
"""
@@ -147,8 +103,7 @@ class BECFigureClientMixin:
if isinstance(msg, messages.ScanStatusMessage):
if not self.gui_is_alive():
return
if msg.status == "open":
self.update_script(self, msg)
self.update_script.run(msg)
def show(self) -> None:
"""
@@ -166,7 +121,10 @@ class BECFigureClientMixin:
"""
if self._process is None:
return
self._run_rpc("close", (), wait_for_rpc_response=False)
if self.gui_is_alive():
self._run_rpc("close", (), wait_for_rpc_response=True)
else:
self._run_rpc("close", (), wait_for_rpc_response=False)
self._process.terminate()
self._process_output_processing_thread.join()
self._process = None
@@ -212,6 +170,15 @@ class BECFigureClientMixin:
self.stderr_output.append(self._process.stderr.read(1024))
class RPCResponseTimeoutError(Exception):
"""Exception raised when an RPC response is not received within the expected time."""
def __init__(self, request_id, timeout):
super().__init__(
f"RPC response not received within {timeout} seconds for request ID {request_id}"
)
class RPCBase:
def __init__(self, gui_id: str = None, config: dict = None, parent=None) -> None:
self._client = BECDispatcher().client
@@ -257,7 +224,7 @@ class RPCBase:
parameter={"args": args, "kwargs": kwargs, "gui_id": self._gui_id},
metadata={"request_id": request_id},
)
# print(f"RPCBase: {rpc_msg}")
# pylint: disable=protected-access
receiver = self._root._gui_id
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
@@ -292,16 +259,28 @@ class RPCBase:
return cls(parent=self, **msg_result)
return msg_result
def _wait_for_response(self, request_id):
def _wait_for_response(self, request_id: str, timeout: int = 5):
"""
Wait for the response from the server.
Args:
request_id(str): The request ID.
timeout(int): The timeout in seconds.
Returns:
The response from the server.
"""
start_time = time.time()
response = None
while response is None and self.gui_is_alive():
while response is None and self.gui_is_alive() and (time.time() - start_time) < timeout:
response = self._client.connector.get(
MessageEndpoints.gui_instruction_response(request_id)
)
QCoreApplication.processEvents() # keep UI responsive (and execute signals/slots)
time.sleep(0.1)
if response is None and (time.time() - start_time) >= timeout:
raise RPCResponseTimeoutError(request_id, timeout)
return response
def gui_is_alive(self):

View File

@@ -1,4 +1,6 @@
import inspect
import threading
import time
from bec_lib import MessageEndpoints, messages
from qtpy.QtCore import QTimer
@@ -109,17 +111,25 @@ class BECWidgetsCLIServer:
def shutdown(self):
self._shutdown_event = True
self._heartbeat_timer.stop()
self.client.shutdown()
if __name__ == "__main__": # pragma: no cover
import argparse
import sys
from qtpy.QtCore import QSize
from qtpy.QtGui import QIcon
from qtpy.QtWidgets import QApplication, QMainWindow
app = QApplication(sys.argv)
app.setApplicationName("BEC Figure")
icon = QIcon()
icon.addFile("bec_widgets_icon.png", size=QSize(48, 48))
app.setWindowIcon(icon)
win = QMainWindow()
win.setWindowTitle("BEC Widgets")
parser = argparse.ArgumentParser(description="BEC Widgets CLI Server")
parser.add_argument("--id", type=str, help="The id of the server")

View File

@@ -2,6 +2,7 @@ from .bec_connector import BECConnector, ConnectionConfig
from .bec_dispatcher import BECDispatcher
from .bec_table import BECTable
from .colors import Colors
from .container_utils import WidgetContainerUtils
from .crosshair import Crosshair
from .entry_validator import EntryValidator
from .rpc_decorator import register_rpc_methods, rpc_public

View File

@@ -127,3 +127,7 @@ class BECConnector:
return self.config.model_dump()
else:
return self.config
def closeEvent(self, event):
self.client.shutdown()
super().closeEvent(event)

View File

@@ -0,0 +1,45 @@
import itertools
from typing import Type
from qtpy.QtWidgets import QWidget
class WidgetContainerUtils:
@staticmethod
def generate_unique_widget_id(container: dict, prefix: str = "widget") -> str:
"""
Generate a unique widget ID.
Args:
container(dict): The container of widgets.
prefix(str): The prefix of the widget ID.
Returns:
widget_id(str): The unique widget ID.
"""
existing_ids = set(container.keys())
for i in itertools.count(1):
widget_id = f"{prefix}_{i}"
if widget_id not in existing_ids:
return widget_id
@staticmethod
def find_first_widget_by_class(
container: dict, widget_class: Type[QWidget], can_fail: bool = True
) -> QWidget | None:
"""
Find the first widget of a given class in the figure.
Args:
container(dict): The container of widgets.
widget_class(Type): The class of the widget to find.
can_fail(bool): If True, the method will return None if no widget is found. If False, it will raise an error.
Returns:
widget: The widget of the given class.
"""
for widget_id, widget in container.items():
if isinstance(widget, widget_class):
return widget
if can_fail:
return None
else:
raise ValueError(f"No widget of class {widget_class} found.")

View File

@@ -3,6 +3,15 @@ class EntryValidator:
self.devices = devices
def validate_signal(self, name: str, entry: str = None) -> str:
"""
Validate a signal entry for a given device. If the entry is not provided, the first signal entry will be used from the device hints.
Args:
name(str): Device name
entry(str): Signal entry
Returns:
str: Signal entry
"""
if name not in self.devices:
raise ValueError(f"Device '{name}' not found in current BEC session")
@@ -15,3 +24,17 @@ class EntryValidator:
raise ValueError(f"Entry '{entry}' not found in device '{name}' signals")
return entry
def validate_monitor(self, monitor: str) -> str:
"""
Validate a monitor entry for a given device.
Args:
monitor(str): Monitor entry
Returns:
str: Monitor entry
"""
if monitor not in self.devices:
raise ValueError(f"Device '{monitor}' not found in current BEC session")
return monitor

View File

@@ -0,0 +1,37 @@
import threading
class ThreadTracker:
def __init__(self, exclude_names=None):
self.exclude_names = exclude_names if exclude_names else []
self.initial_threads = self._capture_threads()
def _capture_threads(self):
return set(
th
for th in threading.enumerate()
if not any(ex_name in th.name for ex_name in self.exclude_names)
and th is not threading.main_thread()
)
def _thread_info(self, threads):
return ", \n".join(f"{th.name}(ID: {th.ident})" for th in threads)
def check_unfinished_threads(self):
current_threads = self._capture_threads()
additional_threads = current_threads - self.initial_threads
closed_threads = self.initial_threads - current_threads
if additional_threads:
raise Exception(
f"###### Initial threads ######:\n {self._thread_info(self.initial_threads)}\n"
f"###### Current threads ######:\n {self._thread_info(current_threads)}\n"
f"###### Closed threads ######:\n {self._thread_info(closed_threads)}\n"
f"###### Unfinished threads ######:\n {self._thread_info(additional_threads)}"
)
else:
print(
"All threads properly closed.\n"
f"###### Initial threads ######:\n {self._thread_info(self.initial_threads)}\n"
f"###### Current threads ######:\n {self._thread_info(current_threads)}\n"
f"###### Closed threads ######:\n {self._thread_info(closed_threads)}"
)

View File

@@ -1,4 +1,3 @@
from .editor import BECEditor
from .figure import BECFigure, FigureConfig
from .monitor import BECMonitor
from .motor_control import (

View File

@@ -1 +0,0 @@
from .editor import BECEditor

View File

@@ -1,407 +0,0 @@
import subprocess
import qdarktheme
from jedi import Script
from jedi.api import Completion
from qtconsole.manager import QtKernelManager
from qtconsole.rich_jupyter_widget import RichJupyterWidget
# pylint: disable=no-name-in-module
from qtpy.Qsci import QsciAPIs, QsciLexerPython, QsciScintilla
from qtpy.QtCore import Qt, QThread, Signal
from qtpy.QtGui import QColor, QFont
from qtpy.QtWidgets import QApplication, QFileDialog, QSplitter, QTextEdit, QVBoxLayout, QWidget
from bec_widgets.widgets.toolbar import ModularToolBar
class AutoCompleter(QThread):
"""Initializes the AutoCompleter thread for handling autocompletion and signature help.
Args:
file_path (str): The path to the file for which autocompletion is required.
api (QsciAPIs): The QScintilla API instance used for managing autocompletions.
enable_docstring (bool, optional): Flag to determine if docstrings should be included in the signatures.
"""
def __init__(self, file_path: str, api: QsciAPIs, enable_docstring: bool = False):
super().__init__(None)
self.file_path = file_path
self.script: Script = None
self.api: QsciAPIs = api
self.completions: list[Completion] = None
self.line = 0
self.index = 0
self.text = ""
# TODO so far disabled, quite buggy, docstring extraction has to be generalised
self.enable_docstring = enable_docstring
def update_script(self, text: str):
"""Updates the script for Jedi completion based on the current editor text.
Args:
text (str): The current text of the editor.
"""
if self.script is None or self.script.path != text:
self.script = Script(text, path=self.file_path)
def run(self):
"""Runs the thread for generating autocompletions. Overrides QThread.run."""
self.update_script(self.text)
try:
self.completions = self.script.complete(self.line, self.index)
self.load_autocomplete(self.completions)
except Exception as err:
print(err)
self.finished.emit()
def get_function_signature(self, line: int, index: int, text: str) -> str:
"""Fetches the function signature for a given position in the text.
Args:
line (int): The line number in the editor.
index (int): The index (column number) in the line.
text (str): The current text of the editor.
Returns:
str: A string containing the function signature or an empty string if not available.
"""
self.update_script(text)
try:
signatures = self.script.get_signatures(line, index)
if signatures and self.enable_docstring is True:
full_docstring = signatures[0].docstring(raw=True)
compact_docstring = self.get_compact_docstring(full_docstring)
return compact_docstring
if signatures and self.enable_docstring is False:
return signatures[0].to_string()
except Exception as err:
print(f"Signature Error:{err}")
return ""
def load_autocomplete(self, completions: list):
"""Loads the autocomplete suggestions into the QScintilla API.
Args:
completions (list[Completion]): A list of Completion objects to be added to the API.
"""
self.api.clear()
for i in completions:
self.api.add(i.name)
self.api.prepare()
def get_completions(self, line: int, index: int, text: str):
"""Starts the autocompletion process for a given position in the text.
Args:
line (int): The line number in the editor.
index (int): The index (column number) in the line.
text (str): The current text of the editor.
"""
self.line = line
self.index = index
self.text = text
self.start()
def get_compact_docstring(self, full_docstring):
"""Generates a compact version of a function's docstring.
Args:
full_docstring (str): The full docstring of a function.
Returns:
str: A compact version of the docstring.
"""
lines = full_docstring.split("\n")
# TODO make it also for different docstring styles, now it is only for numpy style
cutoff_indices = [
i
for i, line in enumerate(lines)
if line.strip().lower() in ["parameters", "returns", "examples", "see also", "warnings"]
]
if cutoff_indices:
lines = lines[: cutoff_indices[0]]
compact_docstring = "\n".join(lines).strip()
return compact_docstring
class ScriptRunnerThread(QThread):
"""Initializes the thread for running a Python script.
Args:
script (str): The script to be executed.
"""
outputSignal = Signal(str)
def __init__(self, script):
super().__init__()
self.script = script
def run(self):
"""Executes the script in a subprocess and emits output through a signal. Overrides QThread.run."""
process = subprocess.Popen(
["python", "-u", "-c", self.script],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
universal_newlines=True,
text=True,
)
while True:
output = process.stdout.readline()
if output == "" and process.poll() is not None:
break
if output:
self.outputSignal.emit(output)
error = process.communicate()[1]
if error:
self.outputSignal.emit(error)
class BECEditor(QWidget):
"""Initializes the BEC Editor widget.
Args:
toolbar_enabled (bool, optional): Determines if the toolbar should be enabled. Defaults to True.
"""
def __init__(
self, toolbar_enabled=True, jupyter_terminal_enabled=False, docstring_tooltip=False
):
super().__init__()
self.script_runner_thread = None
self.file_path = None
self.docstring_tooltip = docstring_tooltip
self.jupyter_terminal_enabled = jupyter_terminal_enabled
# TODO just temporary solution, could be extended to other languages
self.is_python_file = True
# Initialize the editor and terminal
self.editor = QsciScintilla()
if self.jupyter_terminal_enabled:
self.terminal = self.make_jupyter_widget_with_kernel()
else:
self.terminal = QTextEdit()
self.terminal.setReadOnly(True)
# Layout
self.layout = QVBoxLayout()
# Initialize and add the toolbar if enabled
if toolbar_enabled:
self.toolbar = ModularToolBar(self)
self.layout.addWidget(self.toolbar)
# Initialize the splitter
self.splitter = QSplitter(Qt.Orientation.Vertical, self)
self.splitter.addWidget(self.editor)
self.splitter.addWidget(self.terminal)
self.splitter.setSizes([400, 200])
# Add Splitter to layout
self.layout.addWidget(self.splitter)
self.setLayout(self.layout)
self.setup_editor()
def setup_editor(self):
"""Sets up the editor with necessary configurations like lexer, auto indentation, and line numbers."""
# Set the lexer for Python
self.lexer = QsciLexerPython()
self.editor.setLexer(self.lexer)
# Enable auto indentation and competition within the editor
self.editor.setAutoIndent(True)
self.editor.setIndentationsUseTabs(False)
self.editor.setIndentationWidth(4)
self.editor.setAutoCompletionSource(QsciScintilla.AutoCompletionSource.AcsAll)
self.editor.setAutoCompletionThreshold(1)
# Autocomplete for python file
# Connect cursor position change signal for autocompletion
self.editor.cursorPositionChanged.connect(self.on_cursor_position_changed)
# if self.is_python_file: #TODO can be changed depending on supported languages
self.__api = QsciAPIs(self.lexer)
self.auto_completer = AutoCompleter(
self.editor.text(), self.__api, enable_docstring=self.docstring_tooltip
)
self.auto_completer.finished.connect(self.loaded_autocomplete)
# Enable line numbers in the margin
self.editor.setMarginType(0, QsciScintilla.MarginType.NumberMargin)
self.editor.setMarginWidth(0, "0000") # Adjust the width as needed
# Additional UI elements like menu for load/save can be added here
self.set_editor_style()
@staticmethod
def make_jupyter_widget_with_kernel() -> object:
"""Start a kernel, connect to it, and create a RichJupyterWidget to use it"""
kernel_manager = QtKernelManager(kernel_name="python3")
kernel_manager.start_kernel()
kernel_client = kernel_manager.client()
kernel_client.start_channels()
jupyter_widget = RichJupyterWidget()
jupyter_widget.set_default_style("linux")
jupyter_widget.kernel_manager = kernel_manager
jupyter_widget.kernel_client = kernel_client
return jupyter_widget
def show_call_tip(self, position):
"""Shows a call tip at the given position in the editor.
Args:
position (int): The position in the editor where the call tip should be shown.
"""
line, index = self.editor.lineIndexFromPosition(position)
signature = self.auto_completer.get_function_signature(line + 1, index, self.editor.text())
if signature:
self.editor.showUserList(1, [signature])
def on_cursor_position_changed(self, line, index):
"""Handles the event of cursor position change in the editor.
Args:
line (int): The current line number where the cursor is.
index (int): The current column index where the cursor is.
"""
# if self.is_python_file: #TODO can be changed depending on supported languages
# Get completions
self.auto_completer.get_completions(line + 1, index, self.editor.text())
self.editor.autoCompleteFromAPIs()
# Show call tip - signature
position = self.editor.positionFromLineIndex(line, index)
self.show_call_tip(position)
def loaded_autocomplete(self):
"""Placeholder method for actions after autocompletion data is loaded."""
def set_editor_style(self):
"""Sets the style and color scheme for the editor."""
# Dracula Theme Colors
background_color = QColor("#282a36")
text_color = QColor("#f8f8f2")
keyword_color = QColor("#8be9fd")
string_color = QColor("#f1fa8c")
comment_color = QColor("#6272a4")
class_function_color = QColor("#50fa7b")
# Set Font
font = QFont()
font.setFamily("Consolas")
font.setPointSize(10)
self.editor.setFont(font)
self.editor.setMarginsFont(font)
# Set Editor Colors
self.editor.setMarginsBackgroundColor(background_color)
self.editor.setMarginsForegroundColor(text_color)
self.editor.setCaretForegroundColor(text_color)
self.editor.setCaretLineBackgroundColor(QColor("#44475a"))
self.editor.setPaper(background_color) # Set the background color for the entire paper
self.editor.setColor(text_color)
# Set editor
# Syntax Highlighting Colors
lexer = self.editor.lexer()
if lexer:
lexer.setDefaultPaper(background_color) # Set the background color for the text area
lexer.setDefaultColor(text_color)
lexer.setColor(keyword_color, QsciLexerPython.Keyword)
lexer.setColor(string_color, QsciLexerPython.DoubleQuotedString)
lexer.setColor(string_color, QsciLexerPython.SingleQuotedString)
lexer.setColor(comment_color, QsciLexerPython.Comment)
lexer.setColor(class_function_color, QsciLexerPython.ClassName)
lexer.setColor(class_function_color, QsciLexerPython.FunctionMethodName)
# Set the style for all text to have a transparent background
# TODO find better way how to do it!
for style in range(
128
): # QsciScintilla supports 128 styles by default, this set all to transparent background
self.lexer.setPaper(background_color, style)
def run_script(self):
"""Runs the current script in the editor."""
if self.jupyter_terminal_enabled:
script = self.editor.text()
self.terminal.execute(script)
else:
script = self.editor.text()
self.script_runner_thread = ScriptRunnerThread(script)
self.script_runner_thread.outputSignal.connect(self.update_terminal)
self.script_runner_thread.start()
def update_terminal(self, text):
"""Updates the terminal with new text.
Args:
text (str): The text to be appended to the terminal.
"""
self.terminal.append(text)
def enable_docstring_tooltip(self):
"""Enables the docstring tooltip."""
self.docstring_tooltip = True
self.auto_completer.enable_docstring = True
def open_file(self):
"""Opens a file dialog for selecting and opening a Python file in the editor."""
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
file_path, _ = QFileDialog.getOpenFileName(
self, "Open file", "", "Python files (*.py);;All Files (*)", options=options
)
if not file_path:
return
try:
with open(file_path, "r") as file:
text = file.read()
self.editor.setText(text)
except FileNotFoundError:
print(f"The file {file_path} was not found.")
except Exception as e:
print(f"An error occurred while opening the file {file_path}: {e}")
def save_file(self):
"""Opens a save file dialog for saving the current script in the editor."""
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
file_path, _ = QFileDialog.getSaveFileName(
self, "Save file", "", "Python files (*.py);;All Files (*)", options=options
)
if not file_path:
return
try:
if not file_path.endswith(".py"):
file_path += ".py"
with open(file_path, "w") as file:
text = self.editor.text()
file.write(text)
print(f"File saved to {file_path}")
except Exception as e:
print(f"An error occurred while saving the file to {file_path}: {e}")
if __name__ == "__main__": # pragma: no cover
app = QApplication([])
qdarktheme.setup_theme("auto")
mainWin = BECEditor(jupyter_terminal_enabled=True)
mainWin.show()
app.exec()

View File

@@ -14,7 +14,7 @@ from pyqtgraph.Qt import uic
from qtpy.QtCore import Signal as pyqtSignal
from qtpy.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget
from bec_widgets.utils import BECConnector, BECDispatcher, ConnectionConfig
from bec_widgets.utils import BECConnector, BECDispatcher, ConnectionConfig, WidgetContainerUtils
from bec_widgets.widgets.plots import (
BECImageShow,
BECMotorMap,
@@ -188,7 +188,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
config(dict): Additional configuration for the widget.
**axis_kwargs(dict): Additional axis properties to set on the widget after creation.
"""
widget_id = self._generate_unique_widget_id()
widget_id = WidgetContainerUtils.generate_unique_widget_id(self._widgets)
waveform = self.add_widget(
widget_type="Waveform1D",
widget_id=widget_id,
@@ -278,7 +278,9 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
Returns:
BECWaveform: The waveform plot widget.
"""
waveform = self._find_first_widget_by_class(BECWaveform, can_fail=True)
waveform = WidgetContainerUtils.find_first_widget_by_class(
self._widgets, BECWaveform, can_fail=True
)
if waveform is not None:
if axis_kwargs:
waveform.set(**axis_kwargs)
@@ -355,7 +357,9 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
Returns:
BECImageShow: The image widget.
"""
image = self._find_first_widget_by_class(BECImageShow, can_fail=True)
image = WidgetContainerUtils.find_first_widget_by_class(
self._widgets, BECImageShow, can_fail=True
)
if image is not None:
if axis_kwargs:
image.set(**axis_kwargs)
@@ -410,7 +414,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
BECImageShow: The image widget.
"""
widget_id = self._generate_unique_widget_id()
widget_id = WidgetContainerUtils.generate_unique_widget_id(self._widgets)
if config is None:
config = ImageConfig(
widget_class="BECImageShow",
@@ -457,7 +461,9 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
Returns:
BECMotorMap: The motor map widget.
"""
motor_map = self._find_first_widget_by_class(BECMotorMap, can_fail=True)
motor_map = WidgetContainerUtils.find_first_widget_by_class(
self._widgets, BECMotorMap, can_fail=True
)
if motor_map is not None:
if axis_kwargs:
motor_map.set(**axis_kwargs)
@@ -491,7 +497,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
Returns:
BECMotorMap: The motor map widget.
"""
widget_id = self._generate_unique_widget_id()
widget_id = WidgetContainerUtils.generate_unique_widget_id(self._widgets)
if config is None:
config = MotorMapConfig(
widget_class="BECMotorMap",
@@ -532,7 +538,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
**axis_kwargs(dict): Additional axis properties to set on the widget after creation.
"""
if not widget_id:
widget_id = self._generate_unique_widget_id()
widget_id = WidgetContainerUtils.generate_unique_widget_id(self._widgets)
if widget_id in self._widgets:
raise ValueError(f"Widget with ID '{widget_id}' already exists.")
@@ -610,25 +616,6 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
self.setBackground("k" if theme == "dark" else "w")
self.config.theme = theme
def _find_first_widget_by_class(
self, widget_class: Type[BECPlotBase], can_fail: bool = True
) -> BECPlotBase | None:
"""
Find the first widget of a given class in the figure.
Args:
widget_class(Type[BECPlotBase]): The class of the widget to find.
can_fail(bool): If True, the method will return None if no widget is found. If False, it will raise an error.
Returns:
BECPlotBase: The widget of the given class.
"""
for widget_id, widget in self._widgets.items():
if isinstance(widget, widget_class):
return widget
if can_fail:
return None
else:
raise ValueError(f"No widget of class {widget_class} found.")
def _remove_by_coordinates(self, row: int, col: int) -> None:
"""
Remove a widget from the figure by its coordinates.
@@ -695,14 +682,6 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
row += 1
return row, col
def _generate_unique_widget_id(self):
"""Generate a unique widget ID."""
existing_ids = set(self._widgets.keys())
for i in itertools.count(1):
widget_id = f"widget_{i}"
if widget_id not in existing_ids:
return widget_id
def _change_grid(self, widget_id: str, row: int, col: int):
"""
Change the grid to reflect the new position of the widget.

View File

@@ -12,7 +12,7 @@ from qtpy.QtCore import Signal as pyqtSignal
from qtpy.QtCore import Slot as pyqtSlot
from qtpy.QtWidgets import QWidget
from bec_widgets.utils import BECConnector, ConnectionConfig
from bec_widgets.utils import BECConnector, ConnectionConfig, EntryValidator
from .plot_base import BECPlotBase, WidgetConfig
@@ -335,7 +335,9 @@ class BECImageShow(BECPlotBase):
super().__init__(
parent=parent, parent_figure=parent_figure, config=config, client=client, gui_id=gui_id
)
# Get bec shortcuts dev, scans, queue, scan_storage, dap
self.get_bec_shortcuts()
self.entry_validator = EntryValidator(self.dev)
self._images = defaultdict(dict)
self.apply_config(self.config)
self.processor = ImageProcessor()
@@ -507,6 +509,8 @@ class BECImageShow(BECPlotBase):
f"Monitor with ID '{monitor}' already exists in widget '{self.gui_id}'."
)
monitor = self.entry_validator.validate_monitor(monitor)
image_config = ImageItemConfig(
widget_class="BECImageItem",
parent_id=self.gui_id,
@@ -785,6 +789,22 @@ class BECImageShow(BECPlotBase):
return True
return False
def _validate_monitor(self, monitor: str, validate_bec: bool = True):
"""
Validate the monitor name.
Args:
monitor(str): The name of the monitor.
validate_bec(bool): Whether to validate the monitor name with BEC.
Returns:
bool: True if the monitor name is valid, False otherwise.
"""
if not monitor or monitor == "":
return False
if validate_bec:
return monitor in self.dev
return True
def cleanup(self):
"""
Clean up the widget.

View File

@@ -1,11 +1,10 @@
# pylint: disable= missing-module-docstring
from setuptools import find_packages, setup
__version__ = "0.46.6"
__version__ = "0.49.1"
# Default to PyQt6 if no other Qt binding is installed
QT_DEPENDENCY = "PyQt6>=6.0"
QSCINTILLA_DEPENDENCY = "PyQt6-QScintilla"
QT_DEPENDENCY = "PyQt6<=6.6.3"
# pylint: disable=unused-import
try:
@@ -14,15 +13,14 @@ except ImportError:
pass
else:
QT_DEPENDENCY = "PyQt5>=5.9"
QSCINTILLA_DEPENDENCY = "QScintilla"
if __name__ == "__main__":
setup(
install_requires=[
"pydantic",
"qtconsole",
"PyQt6-Qt6<=6.6.3",
QT_DEPENDENCY,
QSCINTILLA_DEPENDENCY,
"jedi",
"qtpy",
"pyqtgraph",
@@ -43,10 +41,16 @@ if __name__ == "__main__":
"isort",
],
"pyqt5": ["PyQt5>=5.9"],
"pyqt6": ["PyQt6>=6.0"],
"pyqt6": ["PyQt6<=6.6.3"],
},
version=__version__,
packages=find_packages(),
include_package_data=True,
package_data={"": ["*.ui", "*.yaml"]},
package_data={
"": [
"*.ui",
"*.yaml",
"*.png",
]
},
)

View File

@@ -91,6 +91,7 @@ DEVICES = [
FakeDevice("bpm4i"),
FakeDevice("bpm3a"),
FakeDevice("bpm3i"),
FakeDevice("eiger"),
]

View File

@@ -1,170 +0,0 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import os
import tempfile
from unittest.mock import MagicMock, mock_open, patch
import pytest
from qtpy.Qsci import QsciScintilla
from qtpy.QtWidgets import QTextEdit
from bec_widgets.widgets.editor.editor import AutoCompleter, BECEditor
@pytest.fixture(scope="function")
def editor(qtbot, docstring_tooltip=False):
"""Helper function to set up the BECEditor widget."""
widget = BECEditor(toolbar_enabled=True, docstring_tooltip=docstring_tooltip)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def find_action_by_text(toolbar, text):
"""Helper function to find an action in the toolbar by its text."""
for action in toolbar.actions():
if action.text() == text:
return action
return None
def test_bec_editor_initialization(editor):
"""Test if the BECEditor widget is initialized correctly."""
assert isinstance(editor.editor, QsciScintilla)
assert isinstance(editor.terminal, QTextEdit)
assert isinstance(editor.auto_completer, AutoCompleter)
@patch("bec_widgets.widgets.editor.editor.Script") # Mock the Script class from jedi
def test_autocompleter_suggestions(mock_script, editor, qtbot):
"""Test if the autocompleter provides correct suggestions based on input."""
# Set up mock return values for the Script.complete method
mock_completion = MagicMock()
mock_completion.name = "mocked_method"
mock_script.return_value.complete.return_value = [mock_completion]
# Simulate user input in the editor
test_code = "print("
editor.editor.setText(test_code)
line, index = editor.editor.getCursorPosition()
# Trigger autocomplete
editor.auto_completer.get_completions(line, index, test_code)
# Use qtbot to wait for the completion thread
qtbot.waitUntil(lambda: editor.auto_completer.completions is not None, timeout=1000)
# Check if the expected completion is in the autocompleter's suggestions
suggested_methods = [completion.name for completion in editor.auto_completer.completions]
assert "mocked_method" in suggested_methods
@patch("bec_widgets.widgets.editor.editor.Script") # Mock the Script class from jedi
@pytest.mark.parametrize(
"docstring_enabled, expected_signature",
[(True, "Mocked signature with docstring"), (False, "Mocked signature")],
)
def test_autocompleter_signature(mock_script, editor, docstring_enabled, expected_signature):
"""Test if the autocompleter provides correct function signature based on docstring setting."""
# Set docstring mode based on parameter
editor.docstring_tooltip = docstring_enabled
editor.auto_completer.enable_docstring = docstring_enabled
# Set up mock return values for the Script.get_signatures method
mock_signature = MagicMock()
if docstring_enabled:
mock_signature.docstring.return_value = expected_signature
else:
mock_signature.to_string.return_value = expected_signature
mock_script.return_value.get_signatures.return_value = [mock_signature]
# Simulate user input that would trigger a signature request
test_code = "print("
editor.editor.setText(test_code)
line, index = editor.editor.getCursorPosition()
# Trigger signature request
signature = editor.auto_completer.get_function_signature(line, index, test_code)
# Check if the expected signature is returned
assert signature == expected_signature
def test_open_file(editor):
"""Test open_file method of BECEditor."""
# Create a temporary file with some content
with tempfile.NamedTemporaryFile(delete=False, suffix=".py") as temp_file:
temp_file.write(b"test file content")
# Mock user selecting the file in the dialog
with patch("qtpy.QtWidgets.QFileDialog.getOpenFileName", return_value=(temp_file.name, "")):
with patch("builtins.open", new_callable=mock_open, read_data="test file content"):
editor.open_file()
# Verify if the editor's text is set to the file content
assert editor.editor.text() == "test file content"
# Clean up by removing the temporary file
os.remove(temp_file.name)
def test_save_file(editor):
"""Test save_file method of BECEditor."""
# Set some text in the editor
editor.editor.setText("test save content")
# Mock user selecting the file in the dialog
with patch(
"qtpy.QtWidgets.QFileDialog.getSaveFileName", return_value=("/path/to/save/file.py", "")
):
with patch("builtins.open", new_callable=mock_open) as mock_file:
editor.save_file()
# Verify if the file was opened correctly for writing
mock_file.assert_called_with("/path/to/save/file.py", "w")
# Verify if the editor's text was written to the file
mock_file().write.assert_called_with("test save content")
def test_open_file_through_toolbar(editor):
"""Test the open_file method through the ModularToolBar."""
# Create a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".py") as temp_file:
temp_file.write(b"test file content")
# Find the open file action in the toolbar
open_action = find_action_by_text(editor.toolbar, "Open File")
assert open_action is not None, "Open File action should be found"
# Mock the file dialog and built-in open function
with patch("qtpy.QtWidgets.QFileDialog.getOpenFileName", return_value=(temp_file.name, "")):
with patch("builtins.open", new_callable=mock_open, read_data="test file content"):
open_action.trigger()
# Verify if the editor's text is set to the file content
assert editor.editor.text() == "test file content"
# Clean up
os.remove(temp_file.name)
def test_save_file_through_toolbar(editor):
"""Test the save_file method through the ModularToolBar."""
# Set some text in the editor
editor.editor.setText("test save content")
# Find the save file action in the toolbar
save_action = find_action_by_text(editor.toolbar, "Save File")
assert save_action is not None, "Save File action should be found"
# Mock the file dialog and built-in open function
with patch(
"qtpy.QtWidgets.QFileDialog.getSaveFileName", return_value=("/path/to/save/file.py", "")
):
with patch("builtins.open", new_callable=mock_open) as mock_file:
save_action.trigger()
# Verify if the file was opened correctly for writing
mock_file.assert_called_with("/path/to/save/file.py", "w")
# Verify if the editor's text was written to the file
mock_file().write.assert_called_with("test save content")