mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-04-15 13:10:54 +02:00
Compare commits
13 Commits
feature/vs
...
v0.49.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6d13a3283b | ||
| ab8537483d | |||
| a22229849c | |||
|
|
1ba266080c | ||
| 6500a00682 | |||
| 9602085f82 | |||
|
|
a1c369de9b | ||
| 6238693ffb | |||
|
|
f3a387e77f | ||
| 71cb80d544 | |||
| 77ff7962cc | |||
|
|
a516b1b247 | ||
| 67a99a1a19 |
34
CHANGELOG.md
34
CHANGELOG.md
@@ -2,6 +2,40 @@
|
||||
|
||||
<!--next-version-placeholder-->
|
||||
|
||||
## v0.49.1 (2024-04-26)
|
||||
|
||||
### Fix
|
||||
|
||||
* **widgets/editor:** Qscintilla editor removed ([`ab85374`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/ab8537483da6c87cb9a0b0f01706208c964f292d))
|
||||
|
||||
## v0.49.0 (2024-04-24)
|
||||
|
||||
### Feature
|
||||
|
||||
* **rpc/client_utils:** Timeout for rpc response ([`6500a00`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/6500a00682a2a7ca535a138bd9496ed8470856a8))
|
||||
|
||||
### Fix
|
||||
|
||||
* **rpc/client_utils:** Close clean up policy for BECFigure ([`9602085`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/9602085f82cbc983f89b5bfe48bf35f08438fa87))
|
||||
|
||||
## v0.48.0 (2024-04-24)
|
||||
|
||||
### Feature
|
||||
|
||||
* **cli:** Added auto updates plugin support ([`6238693`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/6238693ffb44b47a56b969bc4129f2af7a2c04fe))
|
||||
|
||||
## v0.47.0 (2024-04-23)
|
||||
|
||||
### Feature
|
||||
|
||||
* **utils/thread_checker:** Util class to check the thread leakage for closeEvent in qt ([`71cb80d`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/71cb80d544c5f4ef499379a431ce0c17907c7ce8))
|
||||
|
||||
## v0.46.7 (2024-04-21)
|
||||
|
||||
### Fix
|
||||
|
||||
* **plot/image:** Monitors are now validated with current bec session ([`67a99a1`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/67a99a1a19c261f9a1f09635f274cd9fbfe53639))
|
||||
|
||||
## v0.46.6 (2024-04-19)
|
||||
|
||||
### Fix
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
from .auto_updates import AutoUpdates, ScanInfo
|
||||
from .client import BECFigure
|
||||
|
||||
118
bec_widgets/cli/auto_updates.py
Normal file
118
bec_widgets/cli/auto_updates.py
Normal file
@@ -0,0 +1,118 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .client import BECFigure
|
||||
|
||||
|
||||
class ScanInfo(BaseModel):
|
||||
scan_id: str
|
||||
scan_number: int
|
||||
scan_name: str
|
||||
scan_report_devices: list
|
||||
monitored_devices: list
|
||||
status: str
|
||||
|
||||
|
||||
class AutoUpdates:
|
||||
def __init__(self, figure: BECFigure, enabled: bool = True):
|
||||
self.enabled = enabled
|
||||
self.figure = figure
|
||||
|
||||
@staticmethod
|
||||
def get_scan_info(msg) -> ScanInfo:
|
||||
"""
|
||||
Update the script with the given data.
|
||||
"""
|
||||
info = msg.info
|
||||
status = msg.status
|
||||
scan_id = msg.scan_id
|
||||
scan_number = info.get("scan_number", 0)
|
||||
scan_name = info.get("scan_name", "Unknown")
|
||||
scan_report_devices = info.get("scan_report_devices", [])
|
||||
monitored_devices = info.get("readout_priority", {}).get("monitored", [])
|
||||
monitored_devices = [dev for dev in monitored_devices if dev not in scan_report_devices]
|
||||
return ScanInfo(
|
||||
scan_id=scan_id,
|
||||
scan_number=scan_number,
|
||||
scan_name=scan_name,
|
||||
scan_report_devices=scan_report_devices,
|
||||
monitored_devices=monitored_devices,
|
||||
status=status,
|
||||
)
|
||||
|
||||
def run(self, msg):
|
||||
"""
|
||||
Run the update function if enabled.
|
||||
"""
|
||||
if not self.enabled:
|
||||
return
|
||||
if msg.status != "open":
|
||||
return
|
||||
info = self.get_scan_info(msg)
|
||||
self.handler(info)
|
||||
|
||||
@staticmethod
|
||||
def get_selected_device(monitored_devices, selected_device):
|
||||
"""
|
||||
Get the selected device for the plot. If no device is selected, the first
|
||||
device in the monitored devices list is selected.
|
||||
"""
|
||||
if selected_device:
|
||||
return selected_device
|
||||
if len(monitored_devices) > 0:
|
||||
sel_device = monitored_devices[0]
|
||||
return sel_device
|
||||
return None
|
||||
|
||||
def handler(self, info: ScanInfo) -> None:
|
||||
"""
|
||||
Default update function.
|
||||
"""
|
||||
if info.scan_name == "line_scan" and info.scan_report_devices:
|
||||
self.simple_line_scan(info)
|
||||
return
|
||||
if info.scan_name == "grid_scan" and info.scan_report_devices:
|
||||
self.simple_grid_scan(info)
|
||||
return
|
||||
if info.scan_report_devices:
|
||||
self.best_effort(info)
|
||||
return
|
||||
|
||||
def simple_line_scan(self, info: ScanInfo) -> None:
|
||||
"""
|
||||
Simple line scan.
|
||||
"""
|
||||
dev_x = info.scan_report_devices[0]
|
||||
dev_y = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
|
||||
if not dev_y:
|
||||
return
|
||||
self.figure.clear_all()
|
||||
plt = self.figure.plot(dev_x, dev_y)
|
||||
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
|
||||
|
||||
def simple_grid_scan(self, info: ScanInfo) -> None:
|
||||
"""
|
||||
Simple grid scan.
|
||||
"""
|
||||
dev_x = info.scan_report_devices[0]
|
||||
dev_y = info.scan_report_devices[1]
|
||||
dev_z = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
|
||||
self.figure.clear_all()
|
||||
plt = self.figure.plot(dev_x, dev_y, dev_z, label=f"Scan {info.scan_number}")
|
||||
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
|
||||
|
||||
def best_effort(self, info: ScanInfo) -> None:
|
||||
"""
|
||||
Best effort scan.
|
||||
"""
|
||||
dev_x = info.scan_report_devices[0]
|
||||
dev_y = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
|
||||
if not dev_y:
|
||||
return
|
||||
self.figure.clear_all()
|
||||
plt = self.figure.plot(dev_x, dev_y, label=f"Scan {info.scan_number}")
|
||||
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
|
||||
BIN
bec_widgets/cli/bec_widgets_icon.png
Normal file
BIN
bec_widgets/cli/bec_widgets_icon.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 5.6 KiB |
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib
|
||||
import importlib.metadata as imd
|
||||
import os
|
||||
import select
|
||||
import subprocess
|
||||
@@ -17,6 +18,7 @@ from bec_lib.device import DeviceBase
|
||||
from qtpy.QtCore import QCoreApplication
|
||||
|
||||
import bec_widgets.cli.client as client
|
||||
from bec_widgets.cli.auto_updates import AutoUpdates
|
||||
from bec_widgets.utils.bec_dispatcher import BECDispatcher
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -54,68 +56,22 @@ def rpc_call(func):
|
||||
return wrapper
|
||||
|
||||
|
||||
def get_selected_device(monitored_devices, selected_device):
|
||||
"""
|
||||
Get the selected device for the plot. If no device is selected, the first
|
||||
device in the monitored devices list is selected.
|
||||
"""
|
||||
if selected_device:
|
||||
return selected_device
|
||||
if len(monitored_devices) > 0:
|
||||
sel_device = monitored_devices[0]
|
||||
return sel_device
|
||||
return None
|
||||
|
||||
|
||||
def update_script(figure: BECFigure, msg):
|
||||
"""
|
||||
Update the script with the given data.
|
||||
"""
|
||||
info = msg.info
|
||||
status = msg.status
|
||||
scan_id = msg.scan_id
|
||||
scan_number = info.get("scan_number", 0)
|
||||
scan_name = info.get("scan_name", "Unknown")
|
||||
scan_report_devices = info.get("scan_report_devices", [])
|
||||
monitored_devices = info.get("readout_priority", {}).get("monitored", [])
|
||||
monitored_devices = [dev for dev in monitored_devices if dev not in scan_report_devices]
|
||||
|
||||
if scan_name == "line_scan" and scan_report_devices:
|
||||
dev_x = scan_report_devices[0]
|
||||
dev_y = get_selected_device(monitored_devices, figure.selected_device)
|
||||
print(f"Selected device: {dev_y}")
|
||||
if not dev_y:
|
||||
return
|
||||
figure.clear_all()
|
||||
plt = figure.plot(dev_x, dev_y)
|
||||
plt.set(title=f"Scan {scan_number}", x_label=dev_x, y_label=dev_y)
|
||||
elif scan_name == "grid_scan" and scan_report_devices:
|
||||
print(f"Scan {scan_number} is running")
|
||||
dev_x = scan_report_devices[0]
|
||||
dev_y = scan_report_devices[1]
|
||||
dev_z = get_selected_device(monitored_devices, figure.selected_device)
|
||||
figure.clear_all()
|
||||
plt = figure.plot(dev_x, dev_y, dev_z, label=f"Scan {scan_number}")
|
||||
plt.set(title=f"Scan {scan_number}", x_label=dev_x, y_label=dev_y)
|
||||
elif scan_report_devices:
|
||||
dev_x = scan_report_devices[0]
|
||||
dev_y = get_selected_device(monitored_devices, figure.selected_device)
|
||||
if not dev_y:
|
||||
return
|
||||
figure.clear_all()
|
||||
plt = figure.plot(dev_x, dev_y, label=f"Scan {scan_number}")
|
||||
plt.set(title=f"Scan {scan_number}", x_label=dev_x, y_label=dev_y)
|
||||
|
||||
|
||||
class BECFigureClientMixin:
|
||||
def __init__(self, **kwargs) -> None:
|
||||
super().__init__(**kwargs)
|
||||
self._process = None
|
||||
self.update_script = update_script
|
||||
self.update_script = self._get_update_script()
|
||||
self._target_endpoint = MessageEndpoints.scan_status()
|
||||
self._selected_device = None
|
||||
self.stderr_output = []
|
||||
|
||||
def _get_update_script(self) -> AutoUpdates:
|
||||
eps = imd.entry_points(group="bec.widgets.auto_updates")
|
||||
for ep in eps:
|
||||
if ep.name == "plugin_widgets_update":
|
||||
return ep.load()(figure=self)
|
||||
return None
|
||||
|
||||
@property
|
||||
def selected_device(self):
|
||||
"""
|
||||
@@ -147,8 +103,7 @@ class BECFigureClientMixin:
|
||||
if isinstance(msg, messages.ScanStatusMessage):
|
||||
if not self.gui_is_alive():
|
||||
return
|
||||
if msg.status == "open":
|
||||
self.update_script(self, msg)
|
||||
self.update_script.run(msg)
|
||||
|
||||
def show(self) -> None:
|
||||
"""
|
||||
@@ -166,7 +121,10 @@ class BECFigureClientMixin:
|
||||
"""
|
||||
if self._process is None:
|
||||
return
|
||||
self._run_rpc("close", (), wait_for_rpc_response=False)
|
||||
if self.gui_is_alive():
|
||||
self._run_rpc("close", (), wait_for_rpc_response=True)
|
||||
else:
|
||||
self._run_rpc("close", (), wait_for_rpc_response=False)
|
||||
self._process.terminate()
|
||||
self._process_output_processing_thread.join()
|
||||
self._process = None
|
||||
@@ -212,6 +170,15 @@ class BECFigureClientMixin:
|
||||
self.stderr_output.append(self._process.stderr.read(1024))
|
||||
|
||||
|
||||
class RPCResponseTimeoutError(Exception):
|
||||
"""Exception raised when an RPC response is not received within the expected time."""
|
||||
|
||||
def __init__(self, request_id, timeout):
|
||||
super().__init__(
|
||||
f"RPC response not received within {timeout} seconds for request ID {request_id}"
|
||||
)
|
||||
|
||||
|
||||
class RPCBase:
|
||||
def __init__(self, gui_id: str = None, config: dict = None, parent=None) -> None:
|
||||
self._client = BECDispatcher().client
|
||||
@@ -257,7 +224,7 @@ class RPCBase:
|
||||
parameter={"args": args, "kwargs": kwargs, "gui_id": self._gui_id},
|
||||
metadata={"request_id": request_id},
|
||||
)
|
||||
# print(f"RPCBase: {rpc_msg}")
|
||||
|
||||
# pylint: disable=protected-access
|
||||
receiver = self._root._gui_id
|
||||
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
|
||||
@@ -292,16 +259,28 @@ class RPCBase:
|
||||
return cls(parent=self, **msg_result)
|
||||
return msg_result
|
||||
|
||||
def _wait_for_response(self, request_id):
|
||||
def _wait_for_response(self, request_id: str, timeout: int = 5):
|
||||
"""
|
||||
Wait for the response from the server.
|
||||
Args:
|
||||
request_id(str): The request ID.
|
||||
timeout(int): The timeout in seconds.
|
||||
|
||||
Returns:
|
||||
The response from the server.
|
||||
"""
|
||||
start_time = time.time()
|
||||
response = None
|
||||
while response is None and self.gui_is_alive():
|
||||
|
||||
while response is None and self.gui_is_alive() and (time.time() - start_time) < timeout:
|
||||
response = self._client.connector.get(
|
||||
MessageEndpoints.gui_instruction_response(request_id)
|
||||
)
|
||||
QCoreApplication.processEvents() # keep UI responsive (and execute signals/slots)
|
||||
time.sleep(0.1)
|
||||
if response is None and (time.time() - start_time) >= timeout:
|
||||
raise RPCResponseTimeoutError(request_id, timeout)
|
||||
|
||||
return response
|
||||
|
||||
def gui_is_alive(self):
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import inspect
|
||||
import threading
|
||||
import time
|
||||
|
||||
from bec_lib import MessageEndpoints, messages
|
||||
from qtpy.QtCore import QTimer
|
||||
@@ -109,17 +111,25 @@ class BECWidgetsCLIServer:
|
||||
def shutdown(self):
|
||||
self._shutdown_event = True
|
||||
self._heartbeat_timer.stop()
|
||||
self.client.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
from qtpy.QtCore import QSize
|
||||
from qtpy.QtGui import QIcon
|
||||
from qtpy.QtWidgets import QApplication, QMainWindow
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
app.setApplicationName("BEC Figure")
|
||||
icon = QIcon()
|
||||
icon.addFile("bec_widgets_icon.png", size=QSize(48, 48))
|
||||
app.setWindowIcon(icon)
|
||||
|
||||
win = QMainWindow()
|
||||
win.setWindowTitle("BEC Widgets")
|
||||
|
||||
parser = argparse.ArgumentParser(description="BEC Widgets CLI Server")
|
||||
parser.add_argument("--id", type=str, help="The id of the server")
|
||||
|
||||
@@ -2,6 +2,7 @@ from .bec_connector import BECConnector, ConnectionConfig
|
||||
from .bec_dispatcher import BECDispatcher
|
||||
from .bec_table import BECTable
|
||||
from .colors import Colors
|
||||
from .container_utils import WidgetContainerUtils
|
||||
from .crosshair import Crosshair
|
||||
from .entry_validator import EntryValidator
|
||||
from .rpc_decorator import register_rpc_methods, rpc_public
|
||||
|
||||
@@ -127,3 +127,7 @@ class BECConnector:
|
||||
return self.config.model_dump()
|
||||
else:
|
||||
return self.config
|
||||
|
||||
def closeEvent(self, event):
|
||||
self.client.shutdown()
|
||||
super().closeEvent(event)
|
||||
|
||||
45
bec_widgets/utils/container_utils.py
Normal file
45
bec_widgets/utils/container_utils.py
Normal file
@@ -0,0 +1,45 @@
|
||||
import itertools
|
||||
from typing import Type
|
||||
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
|
||||
class WidgetContainerUtils:
|
||||
|
||||
@staticmethod
|
||||
def generate_unique_widget_id(container: dict, prefix: str = "widget") -> str:
|
||||
"""
|
||||
Generate a unique widget ID.
|
||||
Args:
|
||||
container(dict): The container of widgets.
|
||||
prefix(str): The prefix of the widget ID.
|
||||
|
||||
Returns:
|
||||
widget_id(str): The unique widget ID.
|
||||
"""
|
||||
existing_ids = set(container.keys())
|
||||
for i in itertools.count(1):
|
||||
widget_id = f"{prefix}_{i}"
|
||||
if widget_id not in existing_ids:
|
||||
return widget_id
|
||||
|
||||
@staticmethod
|
||||
def find_first_widget_by_class(
|
||||
container: dict, widget_class: Type[QWidget], can_fail: bool = True
|
||||
) -> QWidget | None:
|
||||
"""
|
||||
Find the first widget of a given class in the figure.
|
||||
Args:
|
||||
container(dict): The container of widgets.
|
||||
widget_class(Type): The class of the widget to find.
|
||||
can_fail(bool): If True, the method will return None if no widget is found. If False, it will raise an error.
|
||||
Returns:
|
||||
widget: The widget of the given class.
|
||||
"""
|
||||
for widget_id, widget in container.items():
|
||||
if isinstance(widget, widget_class):
|
||||
return widget
|
||||
if can_fail:
|
||||
return None
|
||||
else:
|
||||
raise ValueError(f"No widget of class {widget_class} found.")
|
||||
@@ -3,6 +3,15 @@ class EntryValidator:
|
||||
self.devices = devices
|
||||
|
||||
def validate_signal(self, name: str, entry: str = None) -> str:
|
||||
"""
|
||||
Validate a signal entry for a given device. If the entry is not provided, the first signal entry will be used from the device hints.
|
||||
Args:
|
||||
name(str): Device name
|
||||
entry(str): Signal entry
|
||||
|
||||
Returns:
|
||||
str: Signal entry
|
||||
"""
|
||||
if name not in self.devices:
|
||||
raise ValueError(f"Device '{name}' not found in current BEC session")
|
||||
|
||||
@@ -15,3 +24,17 @@ class EntryValidator:
|
||||
raise ValueError(f"Entry '{entry}' not found in device '{name}' signals")
|
||||
|
||||
return entry
|
||||
|
||||
def validate_monitor(self, monitor: str) -> str:
|
||||
"""
|
||||
Validate a monitor entry for a given device.
|
||||
Args:
|
||||
monitor(str): Monitor entry
|
||||
|
||||
Returns:
|
||||
str: Monitor entry
|
||||
"""
|
||||
if monitor not in self.devices:
|
||||
raise ValueError(f"Device '{monitor}' not found in current BEC session")
|
||||
|
||||
return monitor
|
||||
|
||||
37
bec_widgets/utils/thread_checker.py
Normal file
37
bec_widgets/utils/thread_checker.py
Normal file
@@ -0,0 +1,37 @@
|
||||
import threading
|
||||
|
||||
|
||||
class ThreadTracker:
|
||||
def __init__(self, exclude_names=None):
|
||||
self.exclude_names = exclude_names if exclude_names else []
|
||||
self.initial_threads = self._capture_threads()
|
||||
|
||||
def _capture_threads(self):
|
||||
return set(
|
||||
th
|
||||
for th in threading.enumerate()
|
||||
if not any(ex_name in th.name for ex_name in self.exclude_names)
|
||||
and th is not threading.main_thread()
|
||||
)
|
||||
|
||||
def _thread_info(self, threads):
|
||||
return ", \n".join(f"{th.name}(ID: {th.ident})" for th in threads)
|
||||
|
||||
def check_unfinished_threads(self):
|
||||
current_threads = self._capture_threads()
|
||||
additional_threads = current_threads - self.initial_threads
|
||||
closed_threads = self.initial_threads - current_threads
|
||||
if additional_threads:
|
||||
raise Exception(
|
||||
f"###### Initial threads ######:\n {self._thread_info(self.initial_threads)}\n"
|
||||
f"###### Current threads ######:\n {self._thread_info(current_threads)}\n"
|
||||
f"###### Closed threads ######:\n {self._thread_info(closed_threads)}\n"
|
||||
f"###### Unfinished threads ######:\n {self._thread_info(additional_threads)}"
|
||||
)
|
||||
else:
|
||||
print(
|
||||
"All threads properly closed.\n"
|
||||
f"###### Initial threads ######:\n {self._thread_info(self.initial_threads)}\n"
|
||||
f"###### Current threads ######:\n {self._thread_info(current_threads)}\n"
|
||||
f"###### Closed threads ######:\n {self._thread_info(closed_threads)}"
|
||||
)
|
||||
@@ -1,4 +1,3 @@
|
||||
from .editor import BECEditor
|
||||
from .figure import BECFigure, FigureConfig
|
||||
from .monitor import BECMonitor
|
||||
from .motor_control import (
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
from .editor import BECEditor
|
||||
@@ -1,407 +0,0 @@
|
||||
import subprocess
|
||||
|
||||
import qdarktheme
|
||||
from jedi import Script
|
||||
from jedi.api import Completion
|
||||
from qtconsole.manager import QtKernelManager
|
||||
from qtconsole.rich_jupyter_widget import RichJupyterWidget
|
||||
|
||||
# pylint: disable=no-name-in-module
|
||||
from qtpy.Qsci import QsciAPIs, QsciLexerPython, QsciScintilla
|
||||
from qtpy.QtCore import Qt, QThread, Signal
|
||||
from qtpy.QtGui import QColor, QFont
|
||||
from qtpy.QtWidgets import QApplication, QFileDialog, QSplitter, QTextEdit, QVBoxLayout, QWidget
|
||||
|
||||
from bec_widgets.widgets.toolbar import ModularToolBar
|
||||
|
||||
|
||||
class AutoCompleter(QThread):
|
||||
"""Initializes the AutoCompleter thread for handling autocompletion and signature help.
|
||||
|
||||
Args:
|
||||
file_path (str): The path to the file for which autocompletion is required.
|
||||
api (QsciAPIs): The QScintilla API instance used for managing autocompletions.
|
||||
enable_docstring (bool, optional): Flag to determine if docstrings should be included in the signatures.
|
||||
"""
|
||||
|
||||
def __init__(self, file_path: str, api: QsciAPIs, enable_docstring: bool = False):
|
||||
super().__init__(None)
|
||||
self.file_path = file_path
|
||||
self.script: Script = None
|
||||
self.api: QsciAPIs = api
|
||||
self.completions: list[Completion] = None
|
||||
self.line = 0
|
||||
self.index = 0
|
||||
self.text = ""
|
||||
|
||||
# TODO so far disabled, quite buggy, docstring extraction has to be generalised
|
||||
self.enable_docstring = enable_docstring
|
||||
|
||||
def update_script(self, text: str):
|
||||
"""Updates the script for Jedi completion based on the current editor text.
|
||||
|
||||
Args:
|
||||
text (str): The current text of the editor.
|
||||
"""
|
||||
if self.script is None or self.script.path != text:
|
||||
self.script = Script(text, path=self.file_path)
|
||||
|
||||
def run(self):
|
||||
"""Runs the thread for generating autocompletions. Overrides QThread.run."""
|
||||
self.update_script(self.text)
|
||||
try:
|
||||
self.completions = self.script.complete(self.line, self.index)
|
||||
self.load_autocomplete(self.completions)
|
||||
except Exception as err:
|
||||
print(err)
|
||||
self.finished.emit()
|
||||
|
||||
def get_function_signature(self, line: int, index: int, text: str) -> str:
|
||||
"""Fetches the function signature for a given position in the text.
|
||||
|
||||
Args:
|
||||
line (int): The line number in the editor.
|
||||
index (int): The index (column number) in the line.
|
||||
text (str): The current text of the editor.
|
||||
|
||||
Returns:
|
||||
str: A string containing the function signature or an empty string if not available.
|
||||
"""
|
||||
self.update_script(text)
|
||||
try:
|
||||
signatures = self.script.get_signatures(line, index)
|
||||
if signatures and self.enable_docstring is True:
|
||||
full_docstring = signatures[0].docstring(raw=True)
|
||||
compact_docstring = self.get_compact_docstring(full_docstring)
|
||||
return compact_docstring
|
||||
if signatures and self.enable_docstring is False:
|
||||
return signatures[0].to_string()
|
||||
except Exception as err:
|
||||
print(f"Signature Error:{err}")
|
||||
return ""
|
||||
|
||||
def load_autocomplete(self, completions: list):
|
||||
"""Loads the autocomplete suggestions into the QScintilla API.
|
||||
|
||||
Args:
|
||||
completions (list[Completion]): A list of Completion objects to be added to the API.
|
||||
"""
|
||||
self.api.clear()
|
||||
for i in completions:
|
||||
self.api.add(i.name)
|
||||
self.api.prepare()
|
||||
|
||||
def get_completions(self, line: int, index: int, text: str):
|
||||
"""Starts the autocompletion process for a given position in the text.
|
||||
|
||||
Args:
|
||||
line (int): The line number in the editor.
|
||||
index (int): The index (column number) in the line.
|
||||
text (str): The current text of the editor.
|
||||
"""
|
||||
self.line = line
|
||||
self.index = index
|
||||
self.text = text
|
||||
self.start()
|
||||
|
||||
def get_compact_docstring(self, full_docstring):
|
||||
"""Generates a compact version of a function's docstring.
|
||||
|
||||
Args:
|
||||
full_docstring (str): The full docstring of a function.
|
||||
|
||||
Returns:
|
||||
str: A compact version of the docstring.
|
||||
"""
|
||||
lines = full_docstring.split("\n")
|
||||
# TODO make it also for different docstring styles, now it is only for numpy style
|
||||
cutoff_indices = [
|
||||
i
|
||||
for i, line in enumerate(lines)
|
||||
if line.strip().lower() in ["parameters", "returns", "examples", "see also", "warnings"]
|
||||
]
|
||||
|
||||
if cutoff_indices:
|
||||
lines = lines[: cutoff_indices[0]]
|
||||
|
||||
compact_docstring = "\n".join(lines).strip()
|
||||
return compact_docstring
|
||||
|
||||
|
||||
class ScriptRunnerThread(QThread):
|
||||
"""Initializes the thread for running a Python script.
|
||||
|
||||
Args:
|
||||
script (str): The script to be executed.
|
||||
"""
|
||||
|
||||
outputSignal = Signal(str)
|
||||
|
||||
def __init__(self, script):
|
||||
super().__init__()
|
||||
self.script = script
|
||||
|
||||
def run(self):
|
||||
"""Executes the script in a subprocess and emits output through a signal. Overrides QThread.run."""
|
||||
process = subprocess.Popen(
|
||||
["python", "-u", "-c", self.script],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
bufsize=1,
|
||||
universal_newlines=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
while True:
|
||||
output = process.stdout.readline()
|
||||
if output == "" and process.poll() is not None:
|
||||
break
|
||||
if output:
|
||||
self.outputSignal.emit(output)
|
||||
error = process.communicate()[1]
|
||||
if error:
|
||||
self.outputSignal.emit(error)
|
||||
|
||||
|
||||
class BECEditor(QWidget):
|
||||
"""Initializes the BEC Editor widget.
|
||||
|
||||
Args:
|
||||
toolbar_enabled (bool, optional): Determines if the toolbar should be enabled. Defaults to True.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, toolbar_enabled=True, jupyter_terminal_enabled=False, docstring_tooltip=False
|
||||
):
|
||||
super().__init__()
|
||||
|
||||
self.script_runner_thread = None
|
||||
self.file_path = None
|
||||
self.docstring_tooltip = docstring_tooltip
|
||||
self.jupyter_terminal_enabled = jupyter_terminal_enabled
|
||||
# TODO just temporary solution, could be extended to other languages
|
||||
self.is_python_file = True
|
||||
|
||||
# Initialize the editor and terminal
|
||||
self.editor = QsciScintilla()
|
||||
if self.jupyter_terminal_enabled:
|
||||
self.terminal = self.make_jupyter_widget_with_kernel()
|
||||
else:
|
||||
self.terminal = QTextEdit()
|
||||
self.terminal.setReadOnly(True)
|
||||
|
||||
# Layout
|
||||
self.layout = QVBoxLayout()
|
||||
|
||||
# Initialize and add the toolbar if enabled
|
||||
if toolbar_enabled:
|
||||
self.toolbar = ModularToolBar(self)
|
||||
self.layout.addWidget(self.toolbar)
|
||||
|
||||
# Initialize the splitter
|
||||
self.splitter = QSplitter(Qt.Orientation.Vertical, self)
|
||||
self.splitter.addWidget(self.editor)
|
||||
self.splitter.addWidget(self.terminal)
|
||||
self.splitter.setSizes([400, 200])
|
||||
|
||||
# Add Splitter to layout
|
||||
self.layout.addWidget(self.splitter)
|
||||
self.setLayout(self.layout)
|
||||
|
||||
self.setup_editor()
|
||||
|
||||
def setup_editor(self):
|
||||
"""Sets up the editor with necessary configurations like lexer, auto indentation, and line numbers."""
|
||||
# Set the lexer for Python
|
||||
self.lexer = QsciLexerPython()
|
||||
self.editor.setLexer(self.lexer)
|
||||
|
||||
# Enable auto indentation and competition within the editor
|
||||
self.editor.setAutoIndent(True)
|
||||
self.editor.setIndentationsUseTabs(False)
|
||||
self.editor.setIndentationWidth(4)
|
||||
self.editor.setAutoCompletionSource(QsciScintilla.AutoCompletionSource.AcsAll)
|
||||
self.editor.setAutoCompletionThreshold(1)
|
||||
|
||||
# Autocomplete for python file
|
||||
# Connect cursor position change signal for autocompletion
|
||||
self.editor.cursorPositionChanged.connect(self.on_cursor_position_changed)
|
||||
|
||||
# if self.is_python_file: #TODO can be changed depending on supported languages
|
||||
self.__api = QsciAPIs(self.lexer)
|
||||
self.auto_completer = AutoCompleter(
|
||||
self.editor.text(), self.__api, enable_docstring=self.docstring_tooltip
|
||||
)
|
||||
self.auto_completer.finished.connect(self.loaded_autocomplete)
|
||||
|
||||
# Enable line numbers in the margin
|
||||
self.editor.setMarginType(0, QsciScintilla.MarginType.NumberMargin)
|
||||
self.editor.setMarginWidth(0, "0000") # Adjust the width as needed
|
||||
|
||||
# Additional UI elements like menu for load/save can be added here
|
||||
self.set_editor_style()
|
||||
|
||||
@staticmethod
|
||||
def make_jupyter_widget_with_kernel() -> object:
|
||||
"""Start a kernel, connect to it, and create a RichJupyterWidget to use it"""
|
||||
kernel_manager = QtKernelManager(kernel_name="python3")
|
||||
kernel_manager.start_kernel()
|
||||
|
||||
kernel_client = kernel_manager.client()
|
||||
kernel_client.start_channels()
|
||||
|
||||
jupyter_widget = RichJupyterWidget()
|
||||
jupyter_widget.set_default_style("linux")
|
||||
jupyter_widget.kernel_manager = kernel_manager
|
||||
jupyter_widget.kernel_client = kernel_client
|
||||
return jupyter_widget
|
||||
|
||||
def show_call_tip(self, position):
|
||||
"""Shows a call tip at the given position in the editor.
|
||||
|
||||
Args:
|
||||
position (int): The position in the editor where the call tip should be shown.
|
||||
"""
|
||||
line, index = self.editor.lineIndexFromPosition(position)
|
||||
signature = self.auto_completer.get_function_signature(line + 1, index, self.editor.text())
|
||||
if signature:
|
||||
self.editor.showUserList(1, [signature])
|
||||
|
||||
def on_cursor_position_changed(self, line, index):
|
||||
"""Handles the event of cursor position change in the editor.
|
||||
|
||||
Args:
|
||||
line (int): The current line number where the cursor is.
|
||||
index (int): The current column index where the cursor is.
|
||||
"""
|
||||
# if self.is_python_file: #TODO can be changed depending on supported languages
|
||||
# Get completions
|
||||
self.auto_completer.get_completions(line + 1, index, self.editor.text())
|
||||
self.editor.autoCompleteFromAPIs()
|
||||
|
||||
# Show call tip - signature
|
||||
position = self.editor.positionFromLineIndex(line, index)
|
||||
self.show_call_tip(position)
|
||||
|
||||
def loaded_autocomplete(self):
|
||||
"""Placeholder method for actions after autocompletion data is loaded."""
|
||||
|
||||
def set_editor_style(self):
|
||||
"""Sets the style and color scheme for the editor."""
|
||||
# Dracula Theme Colors
|
||||
background_color = QColor("#282a36")
|
||||
text_color = QColor("#f8f8f2")
|
||||
keyword_color = QColor("#8be9fd")
|
||||
string_color = QColor("#f1fa8c")
|
||||
comment_color = QColor("#6272a4")
|
||||
class_function_color = QColor("#50fa7b")
|
||||
|
||||
# Set Font
|
||||
font = QFont()
|
||||
font.setFamily("Consolas")
|
||||
font.setPointSize(10)
|
||||
self.editor.setFont(font)
|
||||
self.editor.setMarginsFont(font)
|
||||
|
||||
# Set Editor Colors
|
||||
self.editor.setMarginsBackgroundColor(background_color)
|
||||
self.editor.setMarginsForegroundColor(text_color)
|
||||
self.editor.setCaretForegroundColor(text_color)
|
||||
self.editor.setCaretLineBackgroundColor(QColor("#44475a"))
|
||||
self.editor.setPaper(background_color) # Set the background color for the entire paper
|
||||
self.editor.setColor(text_color)
|
||||
|
||||
# Set editor
|
||||
# Syntax Highlighting Colors
|
||||
lexer = self.editor.lexer()
|
||||
if lexer:
|
||||
lexer.setDefaultPaper(background_color) # Set the background color for the text area
|
||||
lexer.setDefaultColor(text_color)
|
||||
lexer.setColor(keyword_color, QsciLexerPython.Keyword)
|
||||
lexer.setColor(string_color, QsciLexerPython.DoubleQuotedString)
|
||||
lexer.setColor(string_color, QsciLexerPython.SingleQuotedString)
|
||||
lexer.setColor(comment_color, QsciLexerPython.Comment)
|
||||
lexer.setColor(class_function_color, QsciLexerPython.ClassName)
|
||||
lexer.setColor(class_function_color, QsciLexerPython.FunctionMethodName)
|
||||
|
||||
# Set the style for all text to have a transparent background
|
||||
# TODO find better way how to do it!
|
||||
for style in range(
|
||||
128
|
||||
): # QsciScintilla supports 128 styles by default, this set all to transparent background
|
||||
self.lexer.setPaper(background_color, style)
|
||||
|
||||
def run_script(self):
|
||||
"""Runs the current script in the editor."""
|
||||
if self.jupyter_terminal_enabled:
|
||||
script = self.editor.text()
|
||||
self.terminal.execute(script)
|
||||
|
||||
else:
|
||||
script = self.editor.text()
|
||||
self.script_runner_thread = ScriptRunnerThread(script)
|
||||
self.script_runner_thread.outputSignal.connect(self.update_terminal)
|
||||
self.script_runner_thread.start()
|
||||
|
||||
def update_terminal(self, text):
|
||||
"""Updates the terminal with new text.
|
||||
|
||||
Args:
|
||||
text (str): The text to be appended to the terminal.
|
||||
"""
|
||||
self.terminal.append(text)
|
||||
|
||||
def enable_docstring_tooltip(self):
|
||||
"""Enables the docstring tooltip."""
|
||||
self.docstring_tooltip = True
|
||||
self.auto_completer.enable_docstring = True
|
||||
|
||||
def open_file(self):
|
||||
"""Opens a file dialog for selecting and opening a Python file in the editor."""
|
||||
options = QFileDialog.Options()
|
||||
options |= QFileDialog.DontUseNativeDialog
|
||||
file_path, _ = QFileDialog.getOpenFileName(
|
||||
self, "Open file", "", "Python files (*.py);;All Files (*)", options=options
|
||||
)
|
||||
|
||||
if not file_path:
|
||||
return
|
||||
try:
|
||||
with open(file_path, "r") as file:
|
||||
text = file.read()
|
||||
self.editor.setText(text)
|
||||
except FileNotFoundError:
|
||||
print(f"The file {file_path} was not found.")
|
||||
except Exception as e:
|
||||
print(f"An error occurred while opening the file {file_path}: {e}")
|
||||
|
||||
def save_file(self):
|
||||
"""Opens a save file dialog for saving the current script in the editor."""
|
||||
options = QFileDialog.Options()
|
||||
options |= QFileDialog.DontUseNativeDialog
|
||||
file_path, _ = QFileDialog.getSaveFileName(
|
||||
self, "Save file", "", "Python files (*.py);;All Files (*)", options=options
|
||||
)
|
||||
|
||||
if not file_path:
|
||||
return
|
||||
try:
|
||||
if not file_path.endswith(".py"):
|
||||
file_path += ".py"
|
||||
|
||||
with open(file_path, "w") as file:
|
||||
text = self.editor.text()
|
||||
file.write(text)
|
||||
print(f"File saved to {file_path}")
|
||||
except Exception as e:
|
||||
print(f"An error occurred while saving the file to {file_path}: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
app = QApplication([])
|
||||
qdarktheme.setup_theme("auto")
|
||||
|
||||
mainWin = BECEditor(jupyter_terminal_enabled=True)
|
||||
|
||||
mainWin.show()
|
||||
app.exec()
|
||||
@@ -14,7 +14,7 @@ from pyqtgraph.Qt import uic
|
||||
from qtpy.QtCore import Signal as pyqtSignal
|
||||
from qtpy.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget
|
||||
|
||||
from bec_widgets.utils import BECConnector, BECDispatcher, ConnectionConfig
|
||||
from bec_widgets.utils import BECConnector, BECDispatcher, ConnectionConfig, WidgetContainerUtils
|
||||
from bec_widgets.widgets.plots import (
|
||||
BECImageShow,
|
||||
BECMotorMap,
|
||||
@@ -188,7 +188,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
|
||||
config(dict): Additional configuration for the widget.
|
||||
**axis_kwargs(dict): Additional axis properties to set on the widget after creation.
|
||||
"""
|
||||
widget_id = self._generate_unique_widget_id()
|
||||
widget_id = WidgetContainerUtils.generate_unique_widget_id(self._widgets)
|
||||
waveform = self.add_widget(
|
||||
widget_type="Waveform1D",
|
||||
widget_id=widget_id,
|
||||
@@ -278,7 +278,9 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
|
||||
Returns:
|
||||
BECWaveform: The waveform plot widget.
|
||||
"""
|
||||
waveform = self._find_first_widget_by_class(BECWaveform, can_fail=True)
|
||||
waveform = WidgetContainerUtils.find_first_widget_by_class(
|
||||
self._widgets, BECWaveform, can_fail=True
|
||||
)
|
||||
if waveform is not None:
|
||||
if axis_kwargs:
|
||||
waveform.set(**axis_kwargs)
|
||||
@@ -355,7 +357,9 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
|
||||
Returns:
|
||||
BECImageShow: The image widget.
|
||||
"""
|
||||
image = self._find_first_widget_by_class(BECImageShow, can_fail=True)
|
||||
image = WidgetContainerUtils.find_first_widget_by_class(
|
||||
self._widgets, BECImageShow, can_fail=True
|
||||
)
|
||||
if image is not None:
|
||||
if axis_kwargs:
|
||||
image.set(**axis_kwargs)
|
||||
@@ -410,7 +414,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
|
||||
BECImageShow: The image widget.
|
||||
"""
|
||||
|
||||
widget_id = self._generate_unique_widget_id()
|
||||
widget_id = WidgetContainerUtils.generate_unique_widget_id(self._widgets)
|
||||
if config is None:
|
||||
config = ImageConfig(
|
||||
widget_class="BECImageShow",
|
||||
@@ -457,7 +461,9 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
|
||||
Returns:
|
||||
BECMotorMap: The motor map widget.
|
||||
"""
|
||||
motor_map = self._find_first_widget_by_class(BECMotorMap, can_fail=True)
|
||||
motor_map = WidgetContainerUtils.find_first_widget_by_class(
|
||||
self._widgets, BECMotorMap, can_fail=True
|
||||
)
|
||||
if motor_map is not None:
|
||||
if axis_kwargs:
|
||||
motor_map.set(**axis_kwargs)
|
||||
@@ -491,7 +497,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
|
||||
Returns:
|
||||
BECMotorMap: The motor map widget.
|
||||
"""
|
||||
widget_id = self._generate_unique_widget_id()
|
||||
widget_id = WidgetContainerUtils.generate_unique_widget_id(self._widgets)
|
||||
if config is None:
|
||||
config = MotorMapConfig(
|
||||
widget_class="BECMotorMap",
|
||||
@@ -532,7 +538,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
|
||||
**axis_kwargs(dict): Additional axis properties to set on the widget after creation.
|
||||
"""
|
||||
if not widget_id:
|
||||
widget_id = self._generate_unique_widget_id()
|
||||
widget_id = WidgetContainerUtils.generate_unique_widget_id(self._widgets)
|
||||
if widget_id in self._widgets:
|
||||
raise ValueError(f"Widget with ID '{widget_id}' already exists.")
|
||||
|
||||
@@ -610,25 +616,6 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
|
||||
self.setBackground("k" if theme == "dark" else "w")
|
||||
self.config.theme = theme
|
||||
|
||||
def _find_first_widget_by_class(
|
||||
self, widget_class: Type[BECPlotBase], can_fail: bool = True
|
||||
) -> BECPlotBase | None:
|
||||
"""
|
||||
Find the first widget of a given class in the figure.
|
||||
Args:
|
||||
widget_class(Type[BECPlotBase]): The class of the widget to find.
|
||||
can_fail(bool): If True, the method will return None if no widget is found. If False, it will raise an error.
|
||||
Returns:
|
||||
BECPlotBase: The widget of the given class.
|
||||
"""
|
||||
for widget_id, widget in self._widgets.items():
|
||||
if isinstance(widget, widget_class):
|
||||
return widget
|
||||
if can_fail:
|
||||
return None
|
||||
else:
|
||||
raise ValueError(f"No widget of class {widget_class} found.")
|
||||
|
||||
def _remove_by_coordinates(self, row: int, col: int) -> None:
|
||||
"""
|
||||
Remove a widget from the figure by its coordinates.
|
||||
@@ -695,14 +682,6 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
|
||||
row += 1
|
||||
return row, col
|
||||
|
||||
def _generate_unique_widget_id(self):
|
||||
"""Generate a unique widget ID."""
|
||||
existing_ids = set(self._widgets.keys())
|
||||
for i in itertools.count(1):
|
||||
widget_id = f"widget_{i}"
|
||||
if widget_id not in existing_ids:
|
||||
return widget_id
|
||||
|
||||
def _change_grid(self, widget_id: str, row: int, col: int):
|
||||
"""
|
||||
Change the grid to reflect the new position of the widget.
|
||||
|
||||
@@ -12,7 +12,7 @@ from qtpy.QtCore import Signal as pyqtSignal
|
||||
from qtpy.QtCore import Slot as pyqtSlot
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from bec_widgets.utils import BECConnector, ConnectionConfig
|
||||
from bec_widgets.utils import BECConnector, ConnectionConfig, EntryValidator
|
||||
|
||||
from .plot_base import BECPlotBase, WidgetConfig
|
||||
|
||||
@@ -335,7 +335,9 @@ class BECImageShow(BECPlotBase):
|
||||
super().__init__(
|
||||
parent=parent, parent_figure=parent_figure, config=config, client=client, gui_id=gui_id
|
||||
)
|
||||
|
||||
# Get bec shortcuts dev, scans, queue, scan_storage, dap
|
||||
self.get_bec_shortcuts()
|
||||
self.entry_validator = EntryValidator(self.dev)
|
||||
self._images = defaultdict(dict)
|
||||
self.apply_config(self.config)
|
||||
self.processor = ImageProcessor()
|
||||
@@ -507,6 +509,8 @@ class BECImageShow(BECPlotBase):
|
||||
f"Monitor with ID '{monitor}' already exists in widget '{self.gui_id}'."
|
||||
)
|
||||
|
||||
monitor = self.entry_validator.validate_monitor(monitor)
|
||||
|
||||
image_config = ImageItemConfig(
|
||||
widget_class="BECImageItem",
|
||||
parent_id=self.gui_id,
|
||||
@@ -785,6 +789,22 @@ class BECImageShow(BECPlotBase):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _validate_monitor(self, monitor: str, validate_bec: bool = True):
|
||||
"""
|
||||
Validate the monitor name.
|
||||
Args:
|
||||
monitor(str): The name of the monitor.
|
||||
validate_bec(bool): Whether to validate the monitor name with BEC.
|
||||
|
||||
Returns:
|
||||
bool: True if the monitor name is valid, False otherwise.
|
||||
"""
|
||||
if not monitor or monitor == "":
|
||||
return False
|
||||
if validate_bec:
|
||||
return monitor in self.dev
|
||||
return True
|
||||
|
||||
def cleanup(self):
|
||||
"""
|
||||
Clean up the widget.
|
||||
|
||||
18
setup.py
18
setup.py
@@ -1,11 +1,10 @@
|
||||
# pylint: disable= missing-module-docstring
|
||||
from setuptools import find_packages, setup
|
||||
|
||||
__version__ = "0.46.6"
|
||||
__version__ = "0.49.1"
|
||||
|
||||
# Default to PyQt6 if no other Qt binding is installed
|
||||
QT_DEPENDENCY = "PyQt6>=6.0"
|
||||
QSCINTILLA_DEPENDENCY = "PyQt6-QScintilla"
|
||||
QT_DEPENDENCY = "PyQt6<=6.6.3"
|
||||
|
||||
# pylint: disable=unused-import
|
||||
try:
|
||||
@@ -14,15 +13,14 @@ except ImportError:
|
||||
pass
|
||||
else:
|
||||
QT_DEPENDENCY = "PyQt5>=5.9"
|
||||
QSCINTILLA_DEPENDENCY = "QScintilla"
|
||||
|
||||
if __name__ == "__main__":
|
||||
setup(
|
||||
install_requires=[
|
||||
"pydantic",
|
||||
"qtconsole",
|
||||
"PyQt6-Qt6<=6.6.3",
|
||||
QT_DEPENDENCY,
|
||||
QSCINTILLA_DEPENDENCY,
|
||||
"jedi",
|
||||
"qtpy",
|
||||
"pyqtgraph",
|
||||
@@ -43,10 +41,16 @@ if __name__ == "__main__":
|
||||
"isort",
|
||||
],
|
||||
"pyqt5": ["PyQt5>=5.9"],
|
||||
"pyqt6": ["PyQt6>=6.0"],
|
||||
"pyqt6": ["PyQt6<=6.6.3"],
|
||||
},
|
||||
version=__version__,
|
||||
packages=find_packages(),
|
||||
include_package_data=True,
|
||||
package_data={"": ["*.ui", "*.yaml"]},
|
||||
package_data={
|
||||
"": [
|
||||
"*.ui",
|
||||
"*.yaml",
|
||||
"*.png",
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
@@ -91,6 +91,7 @@ DEVICES = [
|
||||
FakeDevice("bpm4i"),
|
||||
FakeDevice("bpm3a"),
|
||||
FakeDevice("bpm3i"),
|
||||
FakeDevice("eiger"),
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -1,170 +0,0 @@
|
||||
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from unittest.mock import MagicMock, mock_open, patch
|
||||
|
||||
import pytest
|
||||
from qtpy.Qsci import QsciScintilla
|
||||
from qtpy.QtWidgets import QTextEdit
|
||||
|
||||
from bec_widgets.widgets.editor.editor import AutoCompleter, BECEditor
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def editor(qtbot, docstring_tooltip=False):
|
||||
"""Helper function to set up the BECEditor widget."""
|
||||
widget = BECEditor(toolbar_enabled=True, docstring_tooltip=docstring_tooltip)
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
yield widget
|
||||
|
||||
|
||||
def find_action_by_text(toolbar, text):
|
||||
"""Helper function to find an action in the toolbar by its text."""
|
||||
for action in toolbar.actions():
|
||||
if action.text() == text:
|
||||
return action
|
||||
return None
|
||||
|
||||
|
||||
def test_bec_editor_initialization(editor):
|
||||
"""Test if the BECEditor widget is initialized correctly."""
|
||||
assert isinstance(editor.editor, QsciScintilla)
|
||||
assert isinstance(editor.terminal, QTextEdit)
|
||||
assert isinstance(editor.auto_completer, AutoCompleter)
|
||||
|
||||
|
||||
@patch("bec_widgets.widgets.editor.editor.Script") # Mock the Script class from jedi
|
||||
def test_autocompleter_suggestions(mock_script, editor, qtbot):
|
||||
"""Test if the autocompleter provides correct suggestions based on input."""
|
||||
# Set up mock return values for the Script.complete method
|
||||
mock_completion = MagicMock()
|
||||
mock_completion.name = "mocked_method"
|
||||
mock_script.return_value.complete.return_value = [mock_completion]
|
||||
|
||||
# Simulate user input in the editor
|
||||
test_code = "print("
|
||||
editor.editor.setText(test_code)
|
||||
line, index = editor.editor.getCursorPosition()
|
||||
|
||||
# Trigger autocomplete
|
||||
editor.auto_completer.get_completions(line, index, test_code)
|
||||
|
||||
# Use qtbot to wait for the completion thread
|
||||
qtbot.waitUntil(lambda: editor.auto_completer.completions is not None, timeout=1000)
|
||||
|
||||
# Check if the expected completion is in the autocompleter's suggestions
|
||||
suggested_methods = [completion.name for completion in editor.auto_completer.completions]
|
||||
assert "mocked_method" in suggested_methods
|
||||
|
||||
|
||||
@patch("bec_widgets.widgets.editor.editor.Script") # Mock the Script class from jedi
|
||||
@pytest.mark.parametrize(
|
||||
"docstring_enabled, expected_signature",
|
||||
[(True, "Mocked signature with docstring"), (False, "Mocked signature")],
|
||||
)
|
||||
def test_autocompleter_signature(mock_script, editor, docstring_enabled, expected_signature):
|
||||
"""Test if the autocompleter provides correct function signature based on docstring setting."""
|
||||
# Set docstring mode based on parameter
|
||||
editor.docstring_tooltip = docstring_enabled
|
||||
editor.auto_completer.enable_docstring = docstring_enabled
|
||||
|
||||
# Set up mock return values for the Script.get_signatures method
|
||||
mock_signature = MagicMock()
|
||||
if docstring_enabled:
|
||||
mock_signature.docstring.return_value = expected_signature
|
||||
else:
|
||||
mock_signature.to_string.return_value = expected_signature
|
||||
mock_script.return_value.get_signatures.return_value = [mock_signature]
|
||||
|
||||
# Simulate user input that would trigger a signature request
|
||||
test_code = "print("
|
||||
editor.editor.setText(test_code)
|
||||
line, index = editor.editor.getCursorPosition()
|
||||
|
||||
# Trigger signature request
|
||||
signature = editor.auto_completer.get_function_signature(line, index, test_code)
|
||||
|
||||
# Check if the expected signature is returned
|
||||
assert signature == expected_signature
|
||||
|
||||
|
||||
def test_open_file(editor):
|
||||
"""Test open_file method of BECEditor."""
|
||||
# Create a temporary file with some content
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".py") as temp_file:
|
||||
temp_file.write(b"test file content")
|
||||
|
||||
# Mock user selecting the file in the dialog
|
||||
with patch("qtpy.QtWidgets.QFileDialog.getOpenFileName", return_value=(temp_file.name, "")):
|
||||
with patch("builtins.open", new_callable=mock_open, read_data="test file content"):
|
||||
editor.open_file()
|
||||
|
||||
# Verify if the editor's text is set to the file content
|
||||
assert editor.editor.text() == "test file content"
|
||||
|
||||
# Clean up by removing the temporary file
|
||||
os.remove(temp_file.name)
|
||||
|
||||
|
||||
def test_save_file(editor):
|
||||
"""Test save_file method of BECEditor."""
|
||||
# Set some text in the editor
|
||||
editor.editor.setText("test save content")
|
||||
|
||||
# Mock user selecting the file in the dialog
|
||||
with patch(
|
||||
"qtpy.QtWidgets.QFileDialog.getSaveFileName", return_value=("/path/to/save/file.py", "")
|
||||
):
|
||||
with patch("builtins.open", new_callable=mock_open) as mock_file:
|
||||
editor.save_file()
|
||||
|
||||
# Verify if the file was opened correctly for writing
|
||||
mock_file.assert_called_with("/path/to/save/file.py", "w")
|
||||
|
||||
# Verify if the editor's text was written to the file
|
||||
mock_file().write.assert_called_with("test save content")
|
||||
|
||||
|
||||
def test_open_file_through_toolbar(editor):
|
||||
"""Test the open_file method through the ModularToolBar."""
|
||||
# Create a temporary file
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".py") as temp_file:
|
||||
temp_file.write(b"test file content")
|
||||
|
||||
# Find the open file action in the toolbar
|
||||
open_action = find_action_by_text(editor.toolbar, "Open File")
|
||||
assert open_action is not None, "Open File action should be found"
|
||||
|
||||
# Mock the file dialog and built-in open function
|
||||
with patch("qtpy.QtWidgets.QFileDialog.getOpenFileName", return_value=(temp_file.name, "")):
|
||||
with patch("builtins.open", new_callable=mock_open, read_data="test file content"):
|
||||
open_action.trigger()
|
||||
# Verify if the editor's text is set to the file content
|
||||
assert editor.editor.text() == "test file content"
|
||||
|
||||
# Clean up
|
||||
os.remove(temp_file.name)
|
||||
|
||||
|
||||
def test_save_file_through_toolbar(editor):
|
||||
"""Test the save_file method through the ModularToolBar."""
|
||||
# Set some text in the editor
|
||||
editor.editor.setText("test save content")
|
||||
|
||||
# Find the save file action in the toolbar
|
||||
save_action = find_action_by_text(editor.toolbar, "Save File")
|
||||
assert save_action is not None, "Save File action should be found"
|
||||
|
||||
# Mock the file dialog and built-in open function
|
||||
with patch(
|
||||
"qtpy.QtWidgets.QFileDialog.getSaveFileName", return_value=("/path/to/save/file.py", "")
|
||||
):
|
||||
with patch("builtins.open", new_callable=mock_open) as mock_file:
|
||||
save_action.trigger()
|
||||
# Verify if the file was opened correctly for writing
|
||||
mock_file.assert_called_with("/path/to/save/file.py", "w")
|
||||
|
||||
# Verify if the editor's text was written to the file
|
||||
mock_file().write.assert_called_with("test save content")
|
||||
Reference in New Issue
Block a user