1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-14 20:50:55 +02:00

Compare commits

...

26 Commits

Author SHA1 Message Date
semantic-release
dae8a3409a 0.50.2
Automatically generated by python-semantic-release
2024-04-30 16:08:47 +00:00
0dfcaa4b70 fix: 'disconnect_slot' has to be symmetric with 'connect_slot' regarding QtThreadSafeCallback 2024-04-30 17:27:02 +02:00
semantic-release
98cb2c08ea 0.50.1
Automatically generated by python-semantic-release
2024-04-29 15:58:30 +00:00
57cb136a09 fix(cli): BECFigure takes the port to connect to redis from the current BECClient, supporting plugins 2024-04-29 16:53:26 +02:00
semantic-release
1d84bca753 0.50.0
Automatically generated by python-semantic-release
2024-04-29 08:26:54 +00:00
4f261be4c7 test(cli/rpc_register): e2e RPCRegister 2024-04-28 18:54:20 +02:00
40eb75f85a test(cli/rpc_register): rpc_register tests added 2024-04-28 12:47:17 +02:00
13c018a797 fix(widgets/figure): access pattern changed for getting widgets by coordinates for rpc 2024-04-28 12:42:58 +02:00
8f20a0b3b1 fix(plots): cleanup policy reviewed for children items 2024-04-28 12:42:58 +02:00
6b6a6b2249 fix(rpc/client_utils): getoutput more transparent + error handling 2024-04-28 12:42:58 +02:00
2ca32675ec fix(rpc_register): thread lock for listign all connections 2024-04-28 12:42:58 +02:00
381d713837 feat(plots): universal cleanup and remove also for children items 2024-04-28 12:42:58 +02:00
a898e7e4f1 feat(rpc/rpc_register): singleton rpc register for all rpc connections for session 2024-04-28 12:42:58 +02:00
semantic-release
6d13a3283b 0.49.1
Automatically generated by python-semantic-release
2024-04-26 16:17:32 +00:00
ab8537483d fix(widgets/editor): qscintilla editor removed 2024-04-26 17:57:54 +02:00
a22229849c build(pyqt6): fixing PyQt6-Qt6 package to 6.6.3 2024-04-25 17:07:29 +02:00
semantic-release
1ba266080c 0.49.0
Automatically generated by python-semantic-release
2024-04-24 15:57:14 +00:00
6500a00682 feat(rpc/client_utils): timeout for rpc response 2024-04-24 17:49:23 +02:00
9602085f82 fix(rpc/client_utils): close clean up policy for BECFigure 2024-04-24 10:54:24 +02:00
semantic-release
a1c369de9b 0.48.0
Automatically generated by python-semantic-release
2024-04-24 05:29:44 +00:00
6238693ffb feat(cli): added auto updates plugin support 2024-04-23 15:22:45 +02:00
semantic-release
f3a387e77f 0.47.0
Automatically generated by python-semantic-release
2024-04-23 13:12:39 +00:00
71cb80d544 feat(utils/thread_checker): util class to check the thread leakage for closeEvent in qt 2024-04-23 14:53:13 +02:00
77ff7962cc refactor(utils/container_utils): part of the logic regarding locating widgets moved from BECFigure to utility class 2024-04-22 12:07:37 +02:00
semantic-release
a516b1b247 0.46.7
Automatically generated by python-semantic-release
2024-04-21 16:24:50 +00:00
67a99a1a19 fix(plot/image): monitors are now validated with current bec session 2024-04-20 01:35:17 +02:00
33 changed files with 899 additions and 824 deletions

View File

@@ -2,6 +2,66 @@
<!--next-version-placeholder-->
## v0.50.2 (2024-04-30)
### Fix
* 'disconnect_slot' has to be symmetric with 'connect_slot' regarding QtThreadSafeCallback ([`0dfcaa4`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/0dfcaa4b708948af0a40ec7cf34d03ff1e96ffac))
## v0.50.1 (2024-04-29)
### Fix
* **cli:** BECFigure takes the port to connect to redis from the current BECClient, supporting plugins ([`57cb136`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/57cb136a098e87a452414bf44e627edb562f6799))
## v0.50.0 (2024-04-29)
### Feature
* **plots:** Universal cleanup and remove also for children items ([`381d713`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/381d713837bb9217c58ba1d8b89691aa35c9f5ec))
* **rpc/rpc_register:** Singleton rpc register for all rpc connections for session ([`a898e7e`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/a898e7e4f14e9ae854703dddbd1eb8c50cb640ff))
### Fix
* **widgets/figure:** Access pattern changed for getting widgets by coordinates for rpc ([`13c018a`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/13c018a79704a7497c140df57179d294e43ecffa))
* **plots:** Cleanup policy reviewed for children items ([`8f20a0b`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/8f20a0b3b1b5dd117b36b45645717190b9ee9cbf))
* **rpc/client_utils:** Getoutput more transparent + error handling ([`6b6a6b2`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/6b6a6b2249f24d3d02bd5fcd7ef1c63ed794c304))
* **rpc_register:** Thread lock for listign all connections ([`2ca3267`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/2ca32675ec3f00137e2140259db51f6e5aa7bb71))
## v0.49.1 (2024-04-26)
### Fix
* **widgets/editor:** Qscintilla editor removed ([`ab85374`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/ab8537483da6c87cb9a0b0f01706208c964f292d))
## v0.49.0 (2024-04-24)
### Feature
* **rpc/client_utils:** Timeout for rpc response ([`6500a00`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/6500a00682a2a7ca535a138bd9496ed8470856a8))
### Fix
* **rpc/client_utils:** Close clean up policy for BECFigure ([`9602085`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/9602085f82cbc983f89b5bfe48bf35f08438fa87))
## v0.48.0 (2024-04-24)
### Feature
* **cli:** Added auto updates plugin support ([`6238693`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/6238693ffb44b47a56b969bc4129f2af7a2c04fe))
## v0.47.0 (2024-04-23)
### Feature
* **utils/thread_checker:** Util class to check the thread leakage for closeEvent in qt ([`71cb80d`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/71cb80d544c5f4ef499379a431ce0c17907c7ce8))
## v0.46.7 (2024-04-21)
### Fix
* **plot/image:** Monitors are now validated with current bec session ([`67a99a1`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/67a99a1a19c261f9a1f09635f274cd9fbfe53639))
## v0.46.6 (2024-04-19)
### Fix

View File

@@ -1 +1,2 @@
from .auto_updates import AutoUpdates, ScanInfo
from .client import BECFigure

View File

@@ -0,0 +1,118 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from pydantic import BaseModel
if TYPE_CHECKING:
from .client import BECFigure
class ScanInfo(BaseModel):
scan_id: str
scan_number: int
scan_name: str
scan_report_devices: list
monitored_devices: list
status: str
class AutoUpdates:
def __init__(self, figure: BECFigure, enabled: bool = True):
self.enabled = enabled
self.figure = figure
@staticmethod
def get_scan_info(msg) -> ScanInfo:
"""
Update the script with the given data.
"""
info = msg.info
status = msg.status
scan_id = msg.scan_id
scan_number = info.get("scan_number", 0)
scan_name = info.get("scan_name", "Unknown")
scan_report_devices = info.get("scan_report_devices", [])
monitored_devices = info.get("readout_priority", {}).get("monitored", [])
monitored_devices = [dev for dev in monitored_devices if dev not in scan_report_devices]
return ScanInfo(
scan_id=scan_id,
scan_number=scan_number,
scan_name=scan_name,
scan_report_devices=scan_report_devices,
monitored_devices=monitored_devices,
status=status,
)
def run(self, msg):
"""
Run the update function if enabled.
"""
if not self.enabled:
return
if msg.status != "open":
return
info = self.get_scan_info(msg)
self.handler(info)
@staticmethod
def get_selected_device(monitored_devices, selected_device):
"""
Get the selected device for the plot. If no device is selected, the first
device in the monitored devices list is selected.
"""
if selected_device:
return selected_device
if len(monitored_devices) > 0:
sel_device = monitored_devices[0]
return sel_device
return None
def handler(self, info: ScanInfo) -> None:
"""
Default update function.
"""
if info.scan_name == "line_scan" and info.scan_report_devices:
self.simple_line_scan(info)
return
if info.scan_name == "grid_scan" and info.scan_report_devices:
self.simple_grid_scan(info)
return
if info.scan_report_devices:
self.best_effort(info)
return
def simple_line_scan(self, info: ScanInfo) -> None:
"""
Simple line scan.
"""
dev_x = info.scan_report_devices[0]
dev_y = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
if not dev_y:
return
self.figure.clear_all()
plt = self.figure.plot(dev_x, dev_y)
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def simple_grid_scan(self, info: ScanInfo) -> None:
"""
Simple grid scan.
"""
dev_x = info.scan_report_devices[0]
dev_y = info.scan_report_devices[1]
dev_z = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
self.figure.clear_all()
plt = self.figure.plot(dev_x, dev_y, dev_z, label=f"Scan {info.scan_number}")
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def best_effort(self, info: ScanInfo) -> None:
"""
Best effort scan.
"""
dev_x = info.scan_report_devices[0]
dev_y = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
if not dev_y:
return
self.figure.clear_all()
plt = self.figure.plot(dev_x, dev_y, label=f"Scan {info.scan_number}")
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.6 KiB

View File

@@ -136,6 +136,13 @@ class BECPlotBase(RPCBase):
class BECWaveform(RPCBase):
@property
@rpc_call
def rpc_id(self) -> "str":
"""
Get the RPC ID of the widget.
"""
@property
@rpc_call
def config_dict(self) -> "dict":
@@ -387,6 +394,13 @@ class BECWaveform(RPCBase):
class BECFigure(RPCBase, BECFigureClientMixin):
@property
@rpc_call
def rpc_id(self) -> "str":
"""
Get the RPC ID of the widget.
"""
@property
@rpc_call
def config_dict(self) -> "dict":
@@ -396,13 +410,16 @@ class BECFigure(RPCBase, BECFigureClientMixin):
dict: The configuration of the widget.
"""
@property
@rpc_call
def axes(self) -> "list[BECPlotBase]":
def axes(self, row: "int", col: "int") -> "BECPlotBase":
"""
Access all widget in BECFigure as a list
Get widget by its coordinates in the figure.
Args:
row(int): the row coordinate
col(int): the column coordinate
Returns:
list[BECPlotBase]: List of all widgets in the figure.
BECPlotBase: the widget at the given coordinates
"""
@property
@@ -614,8 +631,36 @@ class BECFigure(RPCBase, BECFigureClientMixin):
Clear all widgets from the figure and reset to default state
"""
@rpc_call
def get_all_rpc(self) -> "dict":
"""
Get all registered RPC objects.
"""
@property
@rpc_call
def widget_list(self) -> "list[BECPlotBase]":
"""
Access all widget in BECFigure as a list
Returns:
list[BECPlotBase]: List of all widgets in the figure.
"""
class BECCurve(RPCBase):
@rpc_call
def remove(self):
"""
Remove the curve from the plot.
"""
@property
@rpc_call
def rpc_id(self) -> "str":
"""
Get the RPC ID of the widget.
"""
@property
@rpc_call
def config_dict(self) -> "dict":
@@ -713,6 +758,13 @@ class BECCurve(RPCBase):
class BECImageShow(RPCBase):
@property
@rpc_call
def rpc_id(self) -> "str":
"""
Get the RPC ID of the widget.
"""
@property
@rpc_call
def config_dict(self) -> "dict":
@@ -1045,8 +1097,21 @@ class BECConnector(RPCBase):
dict: The configuration of the widget.
"""
@rpc_call
def get_all_rpc(self) -> "dict":
"""
Get all registered RPC objects.
"""
class BECImageItem(RPCBase):
@property
@rpc_call
def rpc_id(self) -> "str":
"""
Get the RPC ID of the widget.
"""
@property
@rpc_call
def config_dict(self) -> "dict":

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import importlib
import importlib.metadata as imd
import os
import select
import subprocess
@@ -11,12 +12,13 @@ import uuid
from functools import wraps
from typing import TYPE_CHECKING
from bec_lib import MessageEndpoints, messages
from bec_lib import MessageEndpoints, ServiceConfig, messages
from bec_lib.connector import MessageObject
from bec_lib.device import DeviceBase
from qtpy.QtCore import QCoreApplication
import bec_widgets.cli.client as client
from bec_widgets.cli.auto_updates import AutoUpdates
from bec_widgets.utils.bec_dispatcher import BECDispatcher
if TYPE_CHECKING:
@@ -54,68 +56,22 @@ def rpc_call(func):
return wrapper
def get_selected_device(monitored_devices, selected_device):
"""
Get the selected device for the plot. If no device is selected, the first
device in the monitored devices list is selected.
"""
if selected_device:
return selected_device
if len(monitored_devices) > 0:
sel_device = monitored_devices[0]
return sel_device
return None
def update_script(figure: BECFigure, msg):
"""
Update the script with the given data.
"""
info = msg.info
status = msg.status
scan_id = msg.scan_id
scan_number = info.get("scan_number", 0)
scan_name = info.get("scan_name", "Unknown")
scan_report_devices = info.get("scan_report_devices", [])
monitored_devices = info.get("readout_priority", {}).get("monitored", [])
monitored_devices = [dev for dev in monitored_devices if dev not in scan_report_devices]
if scan_name == "line_scan" and scan_report_devices:
dev_x = scan_report_devices[0]
dev_y = get_selected_device(monitored_devices, figure.selected_device)
print(f"Selected device: {dev_y}")
if not dev_y:
return
figure.clear_all()
plt = figure.plot(dev_x, dev_y)
plt.set(title=f"Scan {scan_number}", x_label=dev_x, y_label=dev_y)
elif scan_name == "grid_scan" and scan_report_devices:
print(f"Scan {scan_number} is running")
dev_x = scan_report_devices[0]
dev_y = scan_report_devices[1]
dev_z = get_selected_device(monitored_devices, figure.selected_device)
figure.clear_all()
plt = figure.plot(dev_x, dev_y, dev_z, label=f"Scan {scan_number}")
plt.set(title=f"Scan {scan_number}", x_label=dev_x, y_label=dev_y)
elif scan_report_devices:
dev_x = scan_report_devices[0]
dev_y = get_selected_device(monitored_devices, figure.selected_device)
if not dev_y:
return
figure.clear_all()
plt = figure.plot(dev_x, dev_y, label=f"Scan {scan_number}")
plt.set(title=f"Scan {scan_number}", x_label=dev_x, y_label=dev_y)
class BECFigureClientMixin:
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._process = None
self.update_script = update_script
self.update_script = self._get_update_script()
self._target_endpoint = MessageEndpoints.scan_status()
self._selected_device = None
self.stderr_output = []
def _get_update_script(self) -> AutoUpdates:
eps = imd.entry_points(group="bec.widgets.auto_updates")
for ep in eps:
if ep.name == "plugin_widgets_update":
return ep.load()(figure=self)
return None
@property
def selected_device(self):
"""
@@ -147,8 +103,7 @@ class BECFigureClientMixin:
if isinstance(msg, messages.ScanStatusMessage):
if not self.gui_is_alive():
return
if msg.status == "open":
self.update_script(self, msg)
self.update_script.run(msg)
def show(self) -> None:
"""
@@ -166,7 +121,10 @@ class BECFigureClientMixin:
"""
if self._process is None:
return
self._run_rpc("close", (), wait_for_rpc_response=False)
if self.gui_is_alive():
self._run_rpc("close", (), wait_for_rpc_response=True)
else:
self._run_rpc("close", (), wait_for_rpc_response=False)
self._process.terminate()
self._process_output_processing_thread.join()
self._process = None
@@ -178,10 +136,11 @@ class BECFigureClientMixin:
"""
self._start_update_script()
# pylint: disable=subprocess-run-check
config = self._client._service_config.redis
monitor_module = importlib.import_module("bec_widgets.cli.server")
monitor_path = monitor_module.__file__
command = [sys.executable, "-u", monitor_path, "--id", self._gui_id]
command = [sys.executable, "-u", monitor_path, "--id", self._gui_id, "--config", config]
self._process = subprocess.Popen(
command, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
@@ -199,17 +158,33 @@ class BECFigureClientMixin:
self.stderr_output.clear()
def _get_output(self) -> str:
os.set_blocking(self._process.stdout.fileno(), False)
os.set_blocking(self._process.stderr.fileno(), False)
while self._process.poll() is None:
readylist, _, _ = select.select([self._process.stdout, self._process.stderr], [], [], 1)
if self._process.stdout in readylist:
# print("*"*10, self._process.stdout.read(1024), flush=True, end="")
self._process.stdout.read(1024)
if self._process.stderr in readylist:
# print("!"*10, self._process.stderr.read(1024), flush=True, end="", file=sys.stderr)
print(self._process.stderr.read(1024), flush=True, end="", file=sys.stderr)
self.stderr_output.append(self._process.stderr.read(1024))
try:
os.set_blocking(self._process.stdout.fileno(), False)
os.set_blocking(self._process.stderr.fileno(), False)
while self._process.poll() is None:
readylist, _, _ = select.select(
[self._process.stdout, self._process.stderr], [], [], 1
)
if self._process.stdout in readylist:
output = self._process.stdout.read(1024)
if output:
print(output, end="")
if self._process.stderr in readylist:
error_output = self._process.stderr.read(1024)
if error_output:
print(error_output, end="", file=sys.stderr)
self.stderr_output.append(error_output)
except Exception as e:
print(f"Error reading process output: {str(e)}")
class RPCResponseTimeoutError(Exception):
"""Exception raised when an RPC response is not received within the expected time."""
def __init__(self, request_id, timeout):
super().__init__(
f"RPC response not received within {timeout} seconds for request ID {request_id}"
)
class RPCBase:
@@ -257,7 +232,7 @@ class RPCBase:
parameter={"args": args, "kwargs": kwargs, "gui_id": self._gui_id},
metadata={"request_id": request_id},
)
# print(f"RPCBase: {rpc_msg}")
# pylint: disable=protected-access
receiver = self._root._gui_id
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
@@ -292,16 +267,28 @@ class RPCBase:
return cls(parent=self, **msg_result)
return msg_result
def _wait_for_response(self, request_id):
def _wait_for_response(self, request_id: str, timeout: int = 5):
"""
Wait for the response from the server.
Args:
request_id(str): The request ID.
timeout(int): The timeout in seconds.
Returns:
The response from the server.
"""
start_time = time.time()
response = None
while response is None and self.gui_is_alive():
while response is None and self.gui_is_alive() and (time.time() - start_time) < timeout:
response = self._client.connector.get(
MessageEndpoints.gui_instruction_response(request_id)
)
QCoreApplication.processEvents() # keep UI responsive (and execute signals/slots)
time.sleep(0.1)
if response is None and (time.time() - start_time) >= timeout:
raise RPCResponseTimeoutError(request_id, timeout)
return response
def gui_is_alive(self):

View File

@@ -0,0 +1,76 @@
from threading import Lock
from weakref import WeakValueDictionary
from qtpy.QtCore import QObject
class RPCRegister:
"""
A singleton class that keeps track of all the RPC objects registered in the system for CLI usage.
"""
_instance = None
_initialized = False
_lock = Lock()
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(RPCRegister, cls).__new__(cls)
cls._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._rpc_register = WeakValueDictionary()
self._initialized = True
def add_rpc(self, rpc: QObject):
"""
Add an RPC object to the register.
Args:
rpc(QObject): The RPC object to be added to the register.
"""
if not hasattr(rpc, "gui_id"):
raise ValueError("RPC object must have a 'gui_id' attribute.")
self._rpc_register[rpc.gui_id] = rpc
def remove_rpc(self, rpc: str):
"""
Remove an RPC object from the register.
Args:
rpc(str): The RPC object to be removed from the register.
"""
if not hasattr(rpc, "gui_id"):
raise ValueError(f"RPC object {rpc} must have a 'gui_id' attribute.")
self._rpc_register.pop(rpc.gui_id, None)
def get_rpc_by_id(self, gui_id: str) -> QObject:
"""
Get an RPC object by its ID.
Args:
gui_id(str): The ID of the RPC object to be retrieved.
Returns:
QObject: The RPC object with the given ID.
"""
rpc_object = self._rpc_register.get(gui_id, None)
return rpc_object
def list_all_connections(self) -> dict:
"""
List all the registered RPC objects.
Returns:
dict: A dictionary containing all the registered RPC objects.
"""
with self._lock:
connections = dict(self._rpc_register)
return connections
@classmethod
def reset_singleton(cls):
"""
Reset the singleton instance.
"""
cls._instance = None
cls._initialized = False

View File

@@ -1,8 +1,11 @@
import inspect
import threading
import time
from bec_lib import MessageEndpoints, messages
from qtpy.QtCore import QTimer
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.widgets.figure import BECFigure
@@ -12,12 +15,16 @@ from bec_widgets.widgets.plots import BECCurve, BECImageShow, BECWaveform
class BECWidgetsCLIServer:
WIDGETS = [BECWaveform, BECFigure, BECCurve, BECImageShow]
def __init__(self, gui_id: str = None, dispatcher: BECDispatcher = None, client=None) -> None:
self.dispatcher = BECDispatcher() if dispatcher is None else dispatcher
def __init__(
self, gui_id: str = None, dispatcher: BECDispatcher = None, client=None, config=None
) -> None:
self.dispatcher = BECDispatcher(config=config) if dispatcher is None else dispatcher
self.client = self.dispatcher.client if client is None else client
self.client.start()
self.gui_id = gui_id
self.fig = BECFigure(gui_id=self.gui_id)
self.rpc_register = RPCRegister()
self.rpc_register.add_rpc(self.fig)
self.dispatcher.connect_slot(
self.on_rpc_update, MessageEndpoints.gui_instructions(self.gui_id)
@@ -27,15 +34,15 @@ class BECWidgetsCLIServer:
self._shutdown_event = False
self._heartbeat_timer = QTimer()
self._heartbeat_timer.timeout.connect(self.emit_heartbeat)
self._heartbeat_timer.start(1000) # Emit heartbeat every 1 seconds
self._heartbeat_timer.start(200) # Emit heartbeat every 1 seconds
def on_rpc_update(self, msg: dict, metadata: dict):
request_id = metadata.get("request_id")
try:
obj = self.get_object_from_config(msg["parameter"])
method = msg["action"]
args = msg["parameter"].get("args", [])
kwargs = msg["parameter"].get("kwargs", {})
obj = self.get_object_from_config(msg["parameter"])
res = self.run_rpc(obj, method, args, kwargs)
except Exception as e:
print(e)
@@ -52,20 +59,10 @@ class BECWidgetsCLIServer:
def get_object_from_config(self, config: dict):
gui_id = config.get("gui_id")
# check if the object is the figure
if gui_id == self.fig.gui_id:
return self.fig
# check if the object is a widget
if gui_id in self.fig._widgets:
obj = self.fig._widgets[config["gui_id"]]
return obj
if self.fig._widgets:
for widget in self.fig._widgets.values():
item = widget.find_widget_by_id(gui_id)
if item:
return item
raise ValueError(f"Object with gui_id {gui_id} not found")
obj = self.rpc_register.get_rpc_by_id(gui_id)
if obj is None:
raise ValueError(f"Object with gui_id {gui_id} not found")
return obj
def run_rpc(self, obj, method, args, kwargs):
method_obj = getattr(obj, method)
@@ -104,30 +101,38 @@ class BECWidgetsCLIServer:
messages.StatusMessage(name=self.gui_id, status=1, info={}),
expire=10,
)
print("Heartbeat emitted")
def shutdown(self):
self._shutdown_event = True
self._heartbeat_timer.stop()
self.client.shutdown()
if __name__ == "__main__": # pragma: no cover
import argparse
import sys
from qtpy.QtCore import QSize
from qtpy.QtGui import QIcon
from qtpy.QtWidgets import QApplication, QMainWindow
app = QApplication(sys.argv)
app.setApplicationName("BEC Figure")
icon = QIcon()
icon.addFile("bec_widgets_icon.png", size=QSize(48, 48))
app.setWindowIcon(icon)
win = QMainWindow()
win.setWindowTitle("BEC Widgets")
parser = argparse.ArgumentParser(description="BEC Widgets CLI Server")
parser.add_argument("--id", type=str, help="The id of the server")
parser.add_argument("--config", type=str, help="Config to connect to redis.")
args = parser.parse_args()
server = BECWidgetsCLIServer(gui_id=args.id)
# server = BECWidgetsCLIServer(gui_id="test")
server = BECWidgetsCLIServer(gui_id=args.id, config=args.config)
# server = BECWidgetsCLIServer(gui_id="test",config="awi-bec-dev-01:6379")
fig = server.fig
win.setCentralWidget(fig)

View File

@@ -7,6 +7,7 @@ from qtconsole.inprocess import QtInProcessKernelManager
from qtconsole.rich_jupyter_widget import RichJupyterWidget
from qtpy.QtWidgets import QApplication, QVBoxLayout, QWidget
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.utils import BECDispatcher
from bec_widgets.widgets import BECFigure
@@ -43,10 +44,14 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
self.safe_close = False
# self.figure.clean_signal.connect(self.confirm_close)
self.register = RPCRegister()
self.register.add_rpc(self.figure)
print("Registered objects:", dict(self.register.list_all_connections()))
# console push
self.console.kernel_manager.kernel.shell.push(
{
"fig": self.figure,
"register": self.register,
"w1": self.w1,
"w2": self.w2,
"w3": self.w3,

View File

@@ -2,6 +2,7 @@ from .bec_connector import BECConnector, ConnectionConfig
from .bec_dispatcher import BECDispatcher
from .bec_table import BECTable
from .colors import Colors
from .container_utils import WidgetContainerUtils
from .crosshair import Crosshair
from .entry_validator import EntryValidator
from .rpc_decorator import register_rpc_methods, rpc_public

View File

@@ -7,6 +7,7 @@ from typing import Optional, Type
from pydantic import BaseModel, Field, field_validator
from qtpy.QtCore import Slot as pyqtSlot
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.utils.bec_dispatcher import BECDispatcher
@@ -31,7 +32,7 @@ class ConnectionConfig(BaseModel):
class BECConnector:
"""Connection mixin class for all BEC widgets, to handle BEC client and device manager"""
USER_ACCESS = ["config_dict"]
USER_ACCESS = ["config_dict", "get_all_rpc"]
def __init__(self, client=None, config: ConnectionConfig = None, gui_id: str = None):
# BEC related connections
@@ -54,6 +55,25 @@ class BECConnector:
else:
self.gui_id = self.config.gui_id
# register widget to rpc register
self.rpc_register = RPCRegister()
self.rpc_register.add_rpc(self)
def get_all_rpc(self) -> dict:
"""Get all registered RPC objects."""
all_connections = self.rpc_register.list_all_connections()
return dict(all_connections)
@property
def rpc_id(self) -> str:
"""Get the RPC ID of the widget."""
return self.gui_id
@rpc_id.setter
def rpc_id(self, rpc_id: str) -> None:
"""Set the RPC ID of the widget."""
self.gui_id = rpc_id
@property
def config_dict(self) -> dict:
"""
@@ -127,3 +147,7 @@ class BECConnector:
return self.config.model_dump()
else:
return self.config
def closeEvent(self, event):
self.client.shutdown()
super().closeEvent(event)

View File

@@ -6,7 +6,7 @@ from collections.abc import Callable
from typing import TYPE_CHECKING, Union
import redis
from bec_lib import BECClient
from bec_lib import BECClient, ServiceConfig
from bec_lib.redis_connector import MessageObject, RedisConnector
from qtpy.QtCore import QObject
from qtpy.QtCore import Signal as pyqtSignal
@@ -71,13 +71,13 @@ class BECDispatcher:
_instance = None
_initialized = False
def __new__(cls, client=None, *args, **kwargs):
def __new__(cls, client=None, config: str = None, *args, **kwargs):
if cls._instance is None:
cls._instance = super(BECDispatcher, cls).__new__(cls)
cls._initialized = False
return cls._instance
def __init__(self, client=None):
def __init__(self, client=None, config: str = None):
if self._initialized:
return
@@ -85,7 +85,14 @@ class BECDispatcher:
self.client = client
if self.client is None:
self.client = BECClient(connector_cls=QtRedisConnector, forced=True)
if config is not None:
host, port = config.split(":")
redis_config = {"host": host, "port": port}
self.client = BECClient(
config=ServiceConfig(redis=redis_config), connector_cls=QtRedisConnector
) # , forced=True)
else:
self.client = BECClient(connector_cls=QtRedisConnector) # , forced=True)
else:
if self.client.started:
# have to reinitialize client to use proper connector
@@ -122,7 +129,15 @@ class BECDispatcher:
self._slots[slot].update(set(topics_str))
def disconnect_slot(self, slot: Callable, topics: Union[str, list]):
self.client.connector.unregister(topics, cb=slot)
# find the right slot to disconnect from ;
# slot callbacks are wrapped in QtThreadSafeCallback objects,
# but the slot we receive here is the original callable
for connected_slot in self._slots:
if connected_slot.cb == slot:
break
else:
return
self.client.connector.unregister(topics, cb=connected_slot)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
self._slots[slot].difference_update(set(topics_str))
if not self._slots[slot]:

View File

@@ -0,0 +1,45 @@
import itertools
from typing import Type
from qtpy.QtWidgets import QWidget
class WidgetContainerUtils:
@staticmethod
def generate_unique_widget_id(container: dict, prefix: str = "widget") -> str:
"""
Generate a unique widget ID.
Args:
container(dict): The container of widgets.
prefix(str): The prefix of the widget ID.
Returns:
widget_id(str): The unique widget ID.
"""
existing_ids = set(container.keys())
for i in itertools.count(1):
widget_id = f"{prefix}_{i}"
if widget_id not in existing_ids:
return widget_id
@staticmethod
def find_first_widget_by_class(
container: dict, widget_class: Type[QWidget], can_fail: bool = True
) -> QWidget | None:
"""
Find the first widget of a given class in the figure.
Args:
container(dict): The container of widgets.
widget_class(Type): The class of the widget to find.
can_fail(bool): If True, the method will return None if no widget is found. If False, it will raise an error.
Returns:
widget: The widget of the given class.
"""
for widget_id, widget in container.items():
if isinstance(widget, widget_class):
return widget
if can_fail:
return None
else:
raise ValueError(f"No widget of class {widget_class} found.")

View File

@@ -3,6 +3,15 @@ class EntryValidator:
self.devices = devices
def validate_signal(self, name: str, entry: str = None) -> str:
"""
Validate a signal entry for a given device. If the entry is not provided, the first signal entry will be used from the device hints.
Args:
name(str): Device name
entry(str): Signal entry
Returns:
str: Signal entry
"""
if name not in self.devices:
raise ValueError(f"Device '{name}' not found in current BEC session")
@@ -15,3 +24,17 @@ class EntryValidator:
raise ValueError(f"Entry '{entry}' not found in device '{name}' signals")
return entry
def validate_monitor(self, monitor: str) -> str:
"""
Validate a monitor entry for a given device.
Args:
monitor(str): Monitor entry
Returns:
str: Monitor entry
"""
if monitor not in self.devices:
raise ValueError(f"Device '{monitor}' not found in current BEC session")
return monitor

View File

@@ -0,0 +1,37 @@
import threading
class ThreadTracker:
def __init__(self, exclude_names=None):
self.exclude_names = exclude_names if exclude_names else []
self.initial_threads = self._capture_threads()
def _capture_threads(self):
return set(
th
for th in threading.enumerate()
if not any(ex_name in th.name for ex_name in self.exclude_names)
and th is not threading.main_thread()
)
def _thread_info(self, threads):
return ", \n".join(f"{th.name}(ID: {th.ident})" for th in threads)
def check_unfinished_threads(self):
current_threads = self._capture_threads()
additional_threads = current_threads - self.initial_threads
closed_threads = self.initial_threads - current_threads
if additional_threads:
raise Exception(
f"###### Initial threads ######:\n {self._thread_info(self.initial_threads)}\n"
f"###### Current threads ######:\n {self._thread_info(current_threads)}\n"
f"###### Closed threads ######:\n {self._thread_info(closed_threads)}\n"
f"###### Unfinished threads ######:\n {self._thread_info(additional_threads)}"
)
else:
print(
"All threads properly closed.\n"
f"###### Initial threads ######:\n {self._thread_info(self.initial_threads)}\n"
f"###### Current threads ######:\n {self._thread_info(current_threads)}\n"
f"###### Closed threads ######:\n {self._thread_info(closed_threads)}"
)

View File

@@ -1,4 +1,3 @@
from .editor import BECEditor
from .figure import BECFigure, FigureConfig
from .monitor import BECMonitor
from .motor_control import (

View File

@@ -1 +0,0 @@
from .editor import BECEditor

View File

@@ -1,407 +0,0 @@
import subprocess
import qdarktheme
from jedi import Script
from jedi.api import Completion
from qtconsole.manager import QtKernelManager
from qtconsole.rich_jupyter_widget import RichJupyterWidget
# pylint: disable=no-name-in-module
from qtpy.Qsci import QsciAPIs, QsciLexerPython, QsciScintilla
from qtpy.QtCore import Qt, QThread, Signal
from qtpy.QtGui import QColor, QFont
from qtpy.QtWidgets import QApplication, QFileDialog, QSplitter, QTextEdit, QVBoxLayout, QWidget
from bec_widgets.widgets.toolbar import ModularToolBar
class AutoCompleter(QThread):
"""Initializes the AutoCompleter thread for handling autocompletion and signature help.
Args:
file_path (str): The path to the file for which autocompletion is required.
api (QsciAPIs): The QScintilla API instance used for managing autocompletions.
enable_docstring (bool, optional): Flag to determine if docstrings should be included in the signatures.
"""
def __init__(self, file_path: str, api: QsciAPIs, enable_docstring: bool = False):
super().__init__(None)
self.file_path = file_path
self.script: Script = None
self.api: QsciAPIs = api
self.completions: list[Completion] = None
self.line = 0
self.index = 0
self.text = ""
# TODO so far disabled, quite buggy, docstring extraction has to be generalised
self.enable_docstring = enable_docstring
def update_script(self, text: str):
"""Updates the script for Jedi completion based on the current editor text.
Args:
text (str): The current text of the editor.
"""
if self.script is None or self.script.path != text:
self.script = Script(text, path=self.file_path)
def run(self):
"""Runs the thread for generating autocompletions. Overrides QThread.run."""
self.update_script(self.text)
try:
self.completions = self.script.complete(self.line, self.index)
self.load_autocomplete(self.completions)
except Exception as err:
print(err)
self.finished.emit()
def get_function_signature(self, line: int, index: int, text: str) -> str:
"""Fetches the function signature for a given position in the text.
Args:
line (int): The line number in the editor.
index (int): The index (column number) in the line.
text (str): The current text of the editor.
Returns:
str: A string containing the function signature or an empty string if not available.
"""
self.update_script(text)
try:
signatures = self.script.get_signatures(line, index)
if signatures and self.enable_docstring is True:
full_docstring = signatures[0].docstring(raw=True)
compact_docstring = self.get_compact_docstring(full_docstring)
return compact_docstring
if signatures and self.enable_docstring is False:
return signatures[0].to_string()
except Exception as err:
print(f"Signature Error:{err}")
return ""
def load_autocomplete(self, completions: list):
"""Loads the autocomplete suggestions into the QScintilla API.
Args:
completions (list[Completion]): A list of Completion objects to be added to the API.
"""
self.api.clear()
for i in completions:
self.api.add(i.name)
self.api.prepare()
def get_completions(self, line: int, index: int, text: str):
"""Starts the autocompletion process for a given position in the text.
Args:
line (int): The line number in the editor.
index (int): The index (column number) in the line.
text (str): The current text of the editor.
"""
self.line = line
self.index = index
self.text = text
self.start()
def get_compact_docstring(self, full_docstring):
"""Generates a compact version of a function's docstring.
Args:
full_docstring (str): The full docstring of a function.
Returns:
str: A compact version of the docstring.
"""
lines = full_docstring.split("\n")
# TODO make it also for different docstring styles, now it is only for numpy style
cutoff_indices = [
i
for i, line in enumerate(lines)
if line.strip().lower() in ["parameters", "returns", "examples", "see also", "warnings"]
]
if cutoff_indices:
lines = lines[: cutoff_indices[0]]
compact_docstring = "\n".join(lines).strip()
return compact_docstring
class ScriptRunnerThread(QThread):
"""Initializes the thread for running a Python script.
Args:
script (str): The script to be executed.
"""
outputSignal = Signal(str)
def __init__(self, script):
super().__init__()
self.script = script
def run(self):
"""Executes the script in a subprocess and emits output through a signal. Overrides QThread.run."""
process = subprocess.Popen(
["python", "-u", "-c", self.script],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
universal_newlines=True,
text=True,
)
while True:
output = process.stdout.readline()
if output == "" and process.poll() is not None:
break
if output:
self.outputSignal.emit(output)
error = process.communicate()[1]
if error:
self.outputSignal.emit(error)
class BECEditor(QWidget):
"""Initializes the BEC Editor widget.
Args:
toolbar_enabled (bool, optional): Determines if the toolbar should be enabled. Defaults to True.
"""
def __init__(
self, toolbar_enabled=True, jupyter_terminal_enabled=False, docstring_tooltip=False
):
super().__init__()
self.script_runner_thread = None
self.file_path = None
self.docstring_tooltip = docstring_tooltip
self.jupyter_terminal_enabled = jupyter_terminal_enabled
# TODO just temporary solution, could be extended to other languages
self.is_python_file = True
# Initialize the editor and terminal
self.editor = QsciScintilla()
if self.jupyter_terminal_enabled:
self.terminal = self.make_jupyter_widget_with_kernel()
else:
self.terminal = QTextEdit()
self.terminal.setReadOnly(True)
# Layout
self.layout = QVBoxLayout()
# Initialize and add the toolbar if enabled
if toolbar_enabled:
self.toolbar = ModularToolBar(self)
self.layout.addWidget(self.toolbar)
# Initialize the splitter
self.splitter = QSplitter(Qt.Orientation.Vertical, self)
self.splitter.addWidget(self.editor)
self.splitter.addWidget(self.terminal)
self.splitter.setSizes([400, 200])
# Add Splitter to layout
self.layout.addWidget(self.splitter)
self.setLayout(self.layout)
self.setup_editor()
def setup_editor(self):
"""Sets up the editor with necessary configurations like lexer, auto indentation, and line numbers."""
# Set the lexer for Python
self.lexer = QsciLexerPython()
self.editor.setLexer(self.lexer)
# Enable auto indentation and competition within the editor
self.editor.setAutoIndent(True)
self.editor.setIndentationsUseTabs(False)
self.editor.setIndentationWidth(4)
self.editor.setAutoCompletionSource(QsciScintilla.AutoCompletionSource.AcsAll)
self.editor.setAutoCompletionThreshold(1)
# Autocomplete for python file
# Connect cursor position change signal for autocompletion
self.editor.cursorPositionChanged.connect(self.on_cursor_position_changed)
# if self.is_python_file: #TODO can be changed depending on supported languages
self.__api = QsciAPIs(self.lexer)
self.auto_completer = AutoCompleter(
self.editor.text(), self.__api, enable_docstring=self.docstring_tooltip
)
self.auto_completer.finished.connect(self.loaded_autocomplete)
# Enable line numbers in the margin
self.editor.setMarginType(0, QsciScintilla.MarginType.NumberMargin)
self.editor.setMarginWidth(0, "0000") # Adjust the width as needed
# Additional UI elements like menu for load/save can be added here
self.set_editor_style()
@staticmethod
def make_jupyter_widget_with_kernel() -> object:
"""Start a kernel, connect to it, and create a RichJupyterWidget to use it"""
kernel_manager = QtKernelManager(kernel_name="python3")
kernel_manager.start_kernel()
kernel_client = kernel_manager.client()
kernel_client.start_channels()
jupyter_widget = RichJupyterWidget()
jupyter_widget.set_default_style("linux")
jupyter_widget.kernel_manager = kernel_manager
jupyter_widget.kernel_client = kernel_client
return jupyter_widget
def show_call_tip(self, position):
"""Shows a call tip at the given position in the editor.
Args:
position (int): The position in the editor where the call tip should be shown.
"""
line, index = self.editor.lineIndexFromPosition(position)
signature = self.auto_completer.get_function_signature(line + 1, index, self.editor.text())
if signature:
self.editor.showUserList(1, [signature])
def on_cursor_position_changed(self, line, index):
"""Handles the event of cursor position change in the editor.
Args:
line (int): The current line number where the cursor is.
index (int): The current column index where the cursor is.
"""
# if self.is_python_file: #TODO can be changed depending on supported languages
# Get completions
self.auto_completer.get_completions(line + 1, index, self.editor.text())
self.editor.autoCompleteFromAPIs()
# Show call tip - signature
position = self.editor.positionFromLineIndex(line, index)
self.show_call_tip(position)
def loaded_autocomplete(self):
"""Placeholder method for actions after autocompletion data is loaded."""
def set_editor_style(self):
"""Sets the style and color scheme for the editor."""
# Dracula Theme Colors
background_color = QColor("#282a36")
text_color = QColor("#f8f8f2")
keyword_color = QColor("#8be9fd")
string_color = QColor("#f1fa8c")
comment_color = QColor("#6272a4")
class_function_color = QColor("#50fa7b")
# Set Font
font = QFont()
font.setFamily("Consolas")
font.setPointSize(10)
self.editor.setFont(font)
self.editor.setMarginsFont(font)
# Set Editor Colors
self.editor.setMarginsBackgroundColor(background_color)
self.editor.setMarginsForegroundColor(text_color)
self.editor.setCaretForegroundColor(text_color)
self.editor.setCaretLineBackgroundColor(QColor("#44475a"))
self.editor.setPaper(background_color) # Set the background color for the entire paper
self.editor.setColor(text_color)
# Set editor
# Syntax Highlighting Colors
lexer = self.editor.lexer()
if lexer:
lexer.setDefaultPaper(background_color) # Set the background color for the text area
lexer.setDefaultColor(text_color)
lexer.setColor(keyword_color, QsciLexerPython.Keyword)
lexer.setColor(string_color, QsciLexerPython.DoubleQuotedString)
lexer.setColor(string_color, QsciLexerPython.SingleQuotedString)
lexer.setColor(comment_color, QsciLexerPython.Comment)
lexer.setColor(class_function_color, QsciLexerPython.ClassName)
lexer.setColor(class_function_color, QsciLexerPython.FunctionMethodName)
# Set the style for all text to have a transparent background
# TODO find better way how to do it!
for style in range(
128
): # QsciScintilla supports 128 styles by default, this set all to transparent background
self.lexer.setPaper(background_color, style)
def run_script(self):
"""Runs the current script in the editor."""
if self.jupyter_terminal_enabled:
script = self.editor.text()
self.terminal.execute(script)
else:
script = self.editor.text()
self.script_runner_thread = ScriptRunnerThread(script)
self.script_runner_thread.outputSignal.connect(self.update_terminal)
self.script_runner_thread.start()
def update_terminal(self, text):
"""Updates the terminal with new text.
Args:
text (str): The text to be appended to the terminal.
"""
self.terminal.append(text)
def enable_docstring_tooltip(self):
"""Enables the docstring tooltip."""
self.docstring_tooltip = True
self.auto_completer.enable_docstring = True
def open_file(self):
"""Opens a file dialog for selecting and opening a Python file in the editor."""
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
file_path, _ = QFileDialog.getOpenFileName(
self, "Open file", "", "Python files (*.py);;All Files (*)", options=options
)
if not file_path:
return
try:
with open(file_path, "r") as file:
text = file.read()
self.editor.setText(text)
except FileNotFoundError:
print(f"The file {file_path} was not found.")
except Exception as e:
print(f"An error occurred while opening the file {file_path}: {e}")
def save_file(self):
"""Opens a save file dialog for saving the current script in the editor."""
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
file_path, _ = QFileDialog.getSaveFileName(
self, "Save file", "", "Python files (*.py);;All Files (*)", options=options
)
if not file_path:
return
try:
if not file_path.endswith(".py"):
file_path += ".py"
with open(file_path, "w") as file:
text = self.editor.text()
file.write(text)
print(f"File saved to {file_path}")
except Exception as e:
print(f"An error occurred while saving the file to {file_path}: {e}")
if __name__ == "__main__": # pragma: no cover
app = QApplication([])
qdarktheme.setup_theme("auto")
mainWin = BECEditor(jupyter_terminal_enabled=True)
mainWin.show()
app.exec()

View File

@@ -14,7 +14,7 @@ from pyqtgraph.Qt import uic
from qtpy.QtCore import Signal as pyqtSignal
from qtpy.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget
from bec_widgets.utils import BECConnector, BECDispatcher, ConnectionConfig
from bec_widgets.utils import BECConnector, BECDispatcher, ConnectionConfig, WidgetContainerUtils
from bec_widgets.widgets.plots import (
BECImageShow,
BECMotorMap,
@@ -97,6 +97,7 @@ class WidgetHandler:
class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
USER_ACCESS = [
"rpc_id",
"config_dict",
"axes",
"widgets",
@@ -110,6 +111,8 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
"change_layout",
"change_theme",
"clear_all",
"get_all_rpc",
"widget_list",
]
clean_signal = pyqtSignal()
@@ -138,8 +141,21 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
# Container to keep track of the grid
self.grid = []
def __getitem__(self, key: tuple | str):
if isinstance(key, tuple) and len(key) == 2:
return self.axes(*key)
elif isinstance(key, str):
widget = self._widgets.get(key)
if widget is None:
raise KeyError(f"No widget with ID {key}")
return self._widgets.get(key)
else:
raise TypeError(
"Key must be a string (widget id) or a tuple of two integers (grid coordinates)"
)
@property
def axes(self) -> list[BECPlotBase]:
def widget_list(self) -> list[BECPlotBase]:
"""
Access all widget in BECFigure as a list
Returns:
@@ -148,8 +164,8 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
axes = [value for value in self._widgets.values() if isinstance(value, BECPlotBase)]
return axes
@axes.setter
def axes(self, value: list[BECPlotBase]):
@widget_list.setter
def widget_list(self, value: list[BECPlotBase]):
self._axes = value
@property
@@ -188,7 +204,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
config(dict): Additional configuration for the widget.
**axis_kwargs(dict): Additional axis properties to set on the widget after creation.
"""
widget_id = self._generate_unique_widget_id()
widget_id = WidgetContainerUtils.generate_unique_widget_id(self._widgets)
waveform = self.add_widget(
widget_type="Waveform1D",
widget_id=widget_id,
@@ -278,7 +294,9 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
Returns:
BECWaveform: The waveform plot widget.
"""
waveform = self._find_first_widget_by_class(BECWaveform, can_fail=True)
waveform = WidgetContainerUtils.find_first_widget_by_class(
self._widgets, BECWaveform, can_fail=True
)
if waveform is not None:
if axis_kwargs:
waveform.set(**axis_kwargs)
@@ -355,7 +373,9 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
Returns:
BECImageShow: The image widget.
"""
image = self._find_first_widget_by_class(BECImageShow, can_fail=True)
image = WidgetContainerUtils.find_first_widget_by_class(
self._widgets, BECImageShow, can_fail=True
)
if image is not None:
if axis_kwargs:
image.set(**axis_kwargs)
@@ -410,7 +430,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
BECImageShow: The image widget.
"""
widget_id = self._generate_unique_widget_id()
widget_id = WidgetContainerUtils.generate_unique_widget_id(self._widgets)
if config is None:
config = ImageConfig(
widget_class="BECImageShow",
@@ -457,7 +477,9 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
Returns:
BECMotorMap: The motor map widget.
"""
motor_map = self._find_first_widget_by_class(BECMotorMap, can_fail=True)
motor_map = WidgetContainerUtils.find_first_widget_by_class(
self._widgets, BECMotorMap, can_fail=True
)
if motor_map is not None:
if axis_kwargs:
motor_map.set(**axis_kwargs)
@@ -491,7 +513,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
Returns:
BECMotorMap: The motor map widget.
"""
widget_id = self._generate_unique_widget_id()
widget_id = WidgetContainerUtils.generate_unique_widget_id(self._widgets)
if config is None:
config = MotorMapConfig(
widget_class="BECMotorMap",
@@ -532,7 +554,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
**axis_kwargs(dict): Additional axis properties to set on the widget after creation.
"""
if not widget_id:
widget_id = self._generate_unique_widget_id()
widget_id = WidgetContainerUtils.generate_unique_widget_id(self._widgets)
if widget_id in self._widgets:
raise ValueError(f"Widget with ID '{widget_id}' already exists.")
@@ -610,25 +632,6 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
self.setBackground("k" if theme == "dark" else "w")
self.config.theme = theme
def _find_first_widget_by_class(
self, widget_class: Type[BECPlotBase], can_fail: bool = True
) -> BECPlotBase | None:
"""
Find the first widget of a given class in the figure.
Args:
widget_class(Type[BECPlotBase]): The class of the widget to find.
can_fail(bool): If True, the method will return None if no widget is found. If False, it will raise an error.
Returns:
BECPlotBase: The widget of the given class.
"""
for widget_id, widget in self._widgets.items():
if isinstance(widget, widget_class):
return widget
if can_fail:
return None
else:
raise ValueError(f"No widget of class {widget_class} found.")
def _remove_by_coordinates(self, row: int, col: int) -> None:
"""
Remove a widget from the figure by its coordinates.
@@ -636,7 +639,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
row(int): The row coordinate of the widget to remove.
col(int): The column coordinate of the widget to remove.
"""
widget = self._get_widget_by_coordinates(row, col)
widget = self.axes(row, col)
if widget:
widget_id = widget.config.gui_id
if widget_id in self._widgets:
@@ -656,24 +659,10 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
self._reindex_grid()
if widget_id in self.config.widgets:
self.config.widgets.pop(widget_id)
print(f"Removed widget {widget_id}.")
else:
raise ValueError(f"Widget with ID '{widget_id}' does not exist.")
def __getitem__(self, key: tuple | str):
if isinstance(key, tuple) and len(key) == 2:
return self._get_widget_by_coordinates(*key)
elif isinstance(key, str):
widget = self._widgets.get(key)
if widget is None:
raise KeyError(f"No widget with ID {key}")
return self._widgets.get(key)
else:
raise TypeError(
"Key must be a string (widget id) or a tuple of two integers (grid coordinates)"
)
def _get_widget_by_coordinates(self, row: int, col: int) -> BECPlotBase:
def axes(self, row: int, col: int) -> BECPlotBase:
"""
Get widget by its coordinates in the figure.
Args:
@@ -695,14 +684,6 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
row += 1
return row, col
def _generate_unique_widget_id(self):
"""Generate a unique widget ID."""
existing_ids = set(self._widgets.keys())
for i in itertools.count(1):
widget_id = f"widget_{i}"
if widget_id not in existing_ids:
return widget_id
def _change_grid(self, widget_id: str, row: int, col: int):
"""
Change the grid to reflect the new position of the widget.
@@ -720,7 +701,6 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
def _reindex_grid(self):
"""Reindex the grid to remove empty rows and columns."""
print(f"old grid: {self.grid}")
new_grid = []
for row in self.grid:
new_row = [widget for widget in row if widget is not None]
@@ -787,8 +767,8 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
def clear_all(self):
"""Clear all widgets from the figure and reset to default state"""
# for widget in self._widgets.values():
# widget.cleanup()
for widget in self._widgets.values():
widget.cleanup()
self.clear()
self._widgets = defaultdict(dict)
self.grid = []

View File

@@ -12,7 +12,7 @@ from qtpy.QtCore import Signal as pyqtSignal
from qtpy.QtCore import Slot as pyqtSlot
from qtpy.QtWidgets import QWidget
from bec_widgets.utils import BECConnector, ConnectionConfig
from bec_widgets.utils import BECConnector, ConnectionConfig, EntryValidator
from .plot_base import BECPlotBase, WidgetConfig
@@ -59,6 +59,7 @@ class ImageConfig(WidgetConfig):
class BECImageItem(BECConnector, pg.ImageItem):
USER_ACCESS = [
"rpc_id",
"config_dict",
"set",
"set_fft",
@@ -287,9 +288,14 @@ class BECImageItem(BECConnector, pg.ImageItem):
else:
raise ValueError("style should be 'simple' or 'full'")
def cleanup(self):
"""Clean up widget."""
self.rpc_register.remove_rpc(self)
class BECImageShow(BECPlotBase):
USER_ACCESS = [
"rpc_id",
"config_dict",
"add_image_by_config",
"get_image_config",
@@ -335,7 +341,9 @@ class BECImageShow(BECPlotBase):
super().__init__(
parent=parent, parent_figure=parent_figure, config=config, client=client, gui_id=gui_id
)
# Get bec shortcuts dev, scans, queue, scan_storage, dap
self.get_bec_shortcuts()
self.entry_validator = EntryValidator(self.dev)
self._images = defaultdict(dict)
self.apply_config(self.config)
self.processor = ImageProcessor()
@@ -356,20 +364,6 @@ class BECImageShow(BECPlotBase):
thread.start()
def find_widget_by_id(self, item_id: str) -> BECImageItem:
"""
Find the widget by its gui_id.
Args:
item_id(str): The gui_id of the widget.
Returns:
BECImageItem: The widget with the given gui_id.
"""
for source, images in self._images.items():
for monitor, image_item in images.items():
if image_item.gui_id == item_id:
return image_item
def find_image_by_monitor(self, item_id: str) -> BECImageItem:
"""
Find the widget by its gui_id.
@@ -507,6 +501,8 @@ class BECImageShow(BECPlotBase):
f"Monitor with ID '{monitor}' already exists in widget '{self.gui_id}'."
)
monitor = self.entry_validator.validate_monitor(monitor)
image_config = ImageItemConfig(
widget_class="BECImageItem",
parent_id=self.gui_id,
@@ -715,10 +711,8 @@ class BECImageShow(BECPlotBase):
processing_config = image_to_update.config.processing
self.processor.set_config(processing_config)
if self.use_threading:
print("using threaded version")
self._create_thread_worker(device, data)
else:
print("using NON-threaded version")
data = self.processor.process_image(data)
self.update_image(device, data)
@@ -785,18 +779,34 @@ class BECImageShow(BECPlotBase):
return True
return False
def _validate_monitor(self, monitor: str, validate_bec: bool = True):
"""
Validate the monitor name.
Args:
monitor(str): The name of the monitor.
validate_bec(bool): Whether to validate the monitor name with BEC.
Returns:
bool: True if the monitor name is valid, False otherwise.
"""
if not monitor or monitor == "":
return False
if validate_bec:
return monitor in self.dev
return True
def cleanup(self):
"""
Clean up the widget.
"""
print(f"Cleaning up {self.gui_id}")
# for monitor in self._images["device_monitor"]:
# self.bec_dispatcher.disconnect_slot(
# self.on_image_update, MessageEndpoints.device_monitor(monitor)
# )
# if self.thread is not None and self.thread.isRunning():
# self.thread.quit()
# self.thread.wait()
for monitor in self._images["device_monitor"]:
self.bec_dispatcher.disconnect_slot(
self.on_image_update, MessageEndpoints.device_monitor(monitor)
)
for image in self.images:
image.cleanup()
self.rpc_register.remove_rpc(self)
class ImageProcessor:

View File

@@ -182,14 +182,18 @@ class BECMotorMap(BECPlotBase):
"""
self.config.scatter_size = scatter_size
def _connect_motor_to_slots(self):
"""Connect motors to slots."""
def _disconnect_current_motors(self):
"""Disconnect the current motors from the slots."""
if self.motor_x is not None and self.motor_y is not None:
old_endpoints = [
endpoints = [
MessageEndpoints.device_readback(self.motor_x),
MessageEndpoints.device_readback(self.motor_y),
]
self.bec_dispatcher.disconnect_slot(self.on_device_readback, old_endpoints)
self.bec_dispatcher.disconnect_slot(self.on_device_readback, endpoints)
def _connect_motor_to_slots(self):
"""Connect motors to slots."""
self._disconnect_current_motors()
self.motor_x = self.config.signals.x.name
self.motor_y = self.config.signals.y.name
@@ -418,18 +422,7 @@ class BECMotorMap(BECPlotBase):
self.update_signal.emit()
if __name__ == "__main__": # pragma: no cover
import sys
import pyqtgraph as pg
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
glw = pg.GraphicsLayoutWidget()
motor_map = BECMotorMap()
motor_map.change_motors("samx", "samy")
glw.addItem(motor_map)
widget = glw
widget.show()
sys.exit(app.exec_())
def cleanup(self):
"""Cleanup the widget."""
self._disconnect_current_motors()
self.rpc_register.remove_rpc(self)

View File

@@ -243,6 +243,7 @@ class BECPlotBase(BECConnector, pg.GraphicsLayout):
def remove(self):
"""Remove the plot widget from the figure."""
if self.figure is not None:
self.cleanup()
self.figure.remove(widget_id=self.gui_id)
def cleanup(self):

View File

@@ -64,6 +64,8 @@ class Waveform1DConfig(WidgetConfig):
class BECCurve(BECConnector, pg.PlotDataItem):
USER_ACCESS = [
"remove",
"rpc_id",
"config_dict",
"set",
"set_data",
@@ -82,6 +84,7 @@ class BECCurve(BECConnector, pg.PlotDataItem):
name: Optional[str] = None,
config: Optional[CurveConfig] = None,
gui_id: Optional[str] = None,
parent_item: Optional[pg.PlotItem] = None,
**kwargs,
):
if config is None:
@@ -93,6 +96,7 @@ class BECCurve(BECConnector, pg.PlotDataItem):
super().__init__(config=config, gui_id=gui_id)
pg.PlotDataItem.__init__(self, name=name)
self.parent_item = parent_item
self.apply_config()
if kwargs:
self.set(**kwargs)
@@ -225,9 +229,19 @@ class BECCurve(BECConnector, pg.PlotDataItem):
x_data, y_data = self.getData()
return x_data, y_data
def cleanup(self):
"""Cleanup the curve."""
self.rpc_register.remove_rpc(self)
def remove(self):
"""Remove the curve from the plot."""
self.cleanup()
self.parent_item.removeItem(self)
class BECWaveform(BECPlotBase):
USER_ACCESS = [
"rpc_id",
"config_dict",
"add_curve_scan",
"add_curve_custom",
@@ -286,19 +300,6 @@ class BECWaveform(BECPlotBase):
self.add_legend()
self.apply_config(self.config)
def find_widget_by_id(self, item_id: str) -> BECCurve:
"""
Find the curve by its ID.
Args:
item_id(str): ID of the curve.
Returns:
BECCurve: The curve object.
"""
for curve in self.plot_item.curves:
if curve.gui_id == item_id:
return curve
def apply_config(self, config: dict | WidgetConfig, replot_last_scan: bool = False):
"""
Apply the configuration to the 1D waveform widget.
@@ -468,7 +469,7 @@ class BECWaveform(BECPlotBase):
Returns:
BECCurve: The curve object.
"""
curve = BECCurve(config=config, name=name)
curve = BECCurve(config=config, name=name, parent_item=self.plot_item)
self._curves_data[source][name] = curve
self.plot_item.addItem(curve)
self.config.curves[name] = curve.config
@@ -796,3 +797,6 @@ class BECWaveform(BECPlotBase):
def cleanup(self):
"""Cleanup the widget connection from BECDispatcher."""
self.bec_dispatcher.disconnect_slot(self.on_scan_segment, MessageEndpoints.scan_segment())
for curve in self.curves:
curve.cleanup()
self.rpc_register.remove_rpc(self)

View File

@@ -1,11 +1,10 @@
# pylint: disable= missing-module-docstring
from setuptools import find_packages, setup
__version__ = "0.46.6"
__version__ = "0.50.2"
# Default to PyQt6 if no other Qt binding is installed
QT_DEPENDENCY = "PyQt6>=6.0"
QSCINTILLA_DEPENDENCY = "PyQt6-QScintilla"
QT_DEPENDENCY = "PyQt6<=6.6.3"
# pylint: disable=unused-import
try:
@@ -14,15 +13,14 @@ except ImportError:
pass
else:
QT_DEPENDENCY = "PyQt5>=5.9"
QSCINTILLA_DEPENDENCY = "QScintilla"
if __name__ == "__main__":
setup(
install_requires=[
"pydantic",
"qtconsole",
"PyQt6-Qt6<=6.6.3",
QT_DEPENDENCY,
QSCINTILLA_DEPENDENCY,
"jedi",
"qtpy",
"pyqtgraph",
@@ -43,10 +41,16 @@ if __name__ == "__main__":
"isort",
],
"pyqt5": ["PyQt5>=5.9"],
"pyqt6": ["PyQt6>=6.0"],
"pyqt6": ["PyQt6<=6.6.3"],
},
version=__version__,
packages=find_packages(),
include_package_data=True,
package_data={"": ["*.ui", "*.yaml"]},
package_data={
"": [
"*.ui",
"*.yaml",
"*.png",
]
},
)

View File

View File

@@ -0,0 +1,25 @@
import pytest
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.cli.server import BECWidgetsCLIServer
from bec_widgets.utils import BECDispatcher
@pytest.fixture(autouse=True)
def rpc_register():
yield RPCRegister()
RPCRegister.reset_singleton()
@pytest.fixture
def rpc_server(qtbot, bec_client_lib, threads_check):
dispatcher = BECDispatcher(client=bec_client_lib) # Has to init singleton with fixture client
server = BECWidgetsCLIServer(gui_id="figure")
qtbot.addWidget(server.fig)
qtbot.waitExposed(server.fig)
qtbot.wait(1000) # 1s long to wait until gui is ready
yield server
dispatcher.disconnect_all()
server.client.shutdown()
server.shutdown()
dispatcher.reset_singleton()

View File

@@ -3,22 +3,6 @@ import pytest
from bec_lib import MessageEndpoints
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
from bec_widgets.cli.server import BECWidgetsCLIServer
from bec_widgets.utils import BECDispatcher
@pytest.fixture
def rpc_server(qtbot, bec_client_lib, threads_check):
dispatcher = BECDispatcher(client=bec_client_lib) # Has to init singleton with fixture client
server = BECWidgetsCLIServer(gui_id="id_test")
qtbot.addWidget(server.fig)
qtbot.waitExposed(server.fig)
qtbot.wait(1000) # 1s long to wait until gui is ready
yield server
dispatcher.disconnect_all()
server.client.shutdown()
server.shutdown()
dispatcher.reset_singleton()
def test_rpc_waveform1d_custom_curve(rpc_server, qtbot):

View File

@@ -0,0 +1,50 @@
import pytest
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
def find_deepest_value(d: dict):
"""
Recursively find the deepest value in a dictionary
Args:
d(dict): Dictionary to search
Returns:
The deepest value in the dictionary.
"""
if isinstance(d, dict):
if d:
return find_deepest_value(next(iter(d.values())))
return d
def test_rpc_register_list_connections(rpc_server, rpc_register, qtbot):
fig = BECFigure(rpc_server.gui_id)
fig_server = rpc_server.fig
plt = fig.plot("samx", "bpm4i")
im = fig.image("eiger")
motor_map = fig.motor_map("samx", "samy")
plt_z = fig.add_plot("samx", "samy", "bpm4i")
all_connections = rpc_register.list_all_connections()
# Construct dict of all rpc items manually
all_subwidgets_expected = dict(fig_server.widgets)
curve_1D = find_deepest_value(fig_server.widgets[plt.rpc_id]._curves_data)
curve_2D = find_deepest_value(fig_server.widgets[plt_z.rpc_id]._curves_data)
curves_expected = {curve_1D.rpc_id: curve_1D, curve_2D.rpc_id: curve_2D}
fig_expected = {fig.rpc_id: fig_server}
image_item_expected = {
fig_server.widgets[im.rpc_id].images[0].rpc_id: fig_server.widgets[im.rpc_id].images[0]
}
all_connections_expected = {
**all_subwidgets_expected,
**curves_expected,
**fig_expected,
**image_item_expected,
}
assert len(all_connections) == 8
assert all_connections == all_connections_expected

View File

@@ -91,6 +91,7 @@ DEVICES = [
FakeDevice("bpm4i"),
FakeDevice("bpm3a"),
FakeDevice("bpm3i"),
FakeDevice("eiger"),
]

View File

@@ -1,8 +1,15 @@
import pytest
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module
@pytest.fixture(autouse=True)
def rpc_register():
yield RPCRegister()
RPCRegister.reset_singleton()
@pytest.fixture(autouse=True)
def bec_dispatcher(threads_check):
bec_dispatcher = bec_dispatcher_module.BECDispatcher()

View File

@@ -1,4 +1,5 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import threading
import time
from unittest import mock
@@ -13,8 +14,9 @@ from bec_widgets.utils.bec_dispatcher import QtRedisConnector
@pytest.fixture
def bec_dispatcher_w_connector(bec_dispatcher, topics_msg_list):
def bec_dispatcher_w_connector(bec_dispatcher, topics_msg_list, send_msg_event):
def pubsub_msg_generator():
send_msg_event.wait()
for topic, msg in topics_msg_list:
yield {"channel": topic.encode(), "pattern": None, "data": msg}
while True:
@@ -33,6 +35,11 @@ def bec_dispatcher_w_connector(bec_dispatcher, topics_msg_list):
dummy_msg = MsgpackSerialization.dumps(ScanMessage(point_id=0, scan_id="0", data={}))
@pytest.fixture
def send_msg_event():
return threading.Event()
@pytest.mark.parametrize(
"topics_msg_list",
[
@@ -43,7 +50,7 @@ dummy_msg = MsgpackSerialization.dumps(ScanMessage(point_id=0, scan_id="0", data
)
],
)
def test_dispatcher_disconnect_all(bec_dispatcher_w_connector, qtbot):
def test_dispatcher_disconnect_all(bec_dispatcher_w_connector, qtbot, send_msg_event):
bec_dispatcher = bec_dispatcher_w_connector
cb1 = mock.Mock(spec=[])
cb2 = mock.Mock(spec=[])
@@ -53,7 +60,81 @@ def test_dispatcher_disconnect_all(bec_dispatcher_w_connector, qtbot):
bec_dispatcher.connect_slot(cb2, "topic2")
bec_dispatcher.connect_slot(cb2, "topic3")
assert len(bec_dispatcher.client.connector._topics_cb) == 3
send_msg_event.set()
qtbot.wait(10)
assert cb1.call_count == 2
assert cb2.call_count == 2
bec_dispatcher.disconnect_all()
assert len(bec_dispatcher.client.connector._topics_cb) == 0
@pytest.mark.parametrize(
"topics_msg_list",
[
(
("topic1", dummy_msg),
("topic2", dummy_msg),
)
],
)
def test_dispatcher_disconnect_one(bec_dispatcher_w_connector, qtbot, send_msg_event):
# test for BEC issue #276
bec_dispatcher = bec_dispatcher_w_connector
cb1 = mock.Mock(spec=[])
cb2 = mock.Mock(spec=[])
bec_dispatcher.connect_slot(cb1, "topic1")
bec_dispatcher.connect_slot(cb2, "topic2")
assert len(bec_dispatcher.client.connector._topics_cb) == 2
bec_dispatcher.disconnect_slot(cb1, "topic1")
assert len(bec_dispatcher.client.connector._topics_cb) == 1
send_msg_event.set()
qtbot.wait(10)
assert cb1.call_count == 0
cb2.assert_called_once()
@pytest.mark.parametrize("topics_msg_list", [(("topic1", dummy_msg),)])
def test_dispatcher_2_cb_same_topic(bec_dispatcher_w_connector, qtbot, send_msg_event):
# test for BEC issue #276
bec_dispatcher = bec_dispatcher_w_connector
cb1 = mock.Mock(spec=[])
cb2 = mock.Mock(spec=[])
bec_dispatcher.connect_slot(cb1, "topic1")
bec_dispatcher.connect_slot(cb2, "topic1")
assert len(bec_dispatcher.client.connector._topics_cb) == 1
bec_dispatcher.disconnect_slot(cb1, "topic1")
send_msg_event.set()
qtbot.wait(10)
assert cb1.call_count == 0
cb2.assert_called_once()
@pytest.mark.parametrize(
"topics_msg_list",
[
(
("topic1", dummy_msg),
("topic2", dummy_msg),
)
],
)
def test_dispatcher_2_topic_same_cb(bec_dispatcher_w_connector, qtbot, send_msg_event):
# test for BEC issue #276
bec_dispatcher = bec_dispatcher_w_connector
cb1 = mock.Mock(spec=[])
bec_dispatcher.connect_slot(cb1, "topic1")
bec_dispatcher.connect_slot(cb1, "topic2")
assert len(bec_dispatcher.client.connector._topics_cb) == 2
bec_dispatcher.disconnect_slot(cb1, "topic1")
assert len(bec_dispatcher.client.connector._topics_cb) == 1
send_msg_event.set()
qtbot.wait(10)
cb1.assert_called_once()

View File

@@ -1,170 +0,0 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import os
import tempfile
from unittest.mock import MagicMock, mock_open, patch
import pytest
from qtpy.Qsci import QsciScintilla
from qtpy.QtWidgets import QTextEdit
from bec_widgets.widgets.editor.editor import AutoCompleter, BECEditor
@pytest.fixture(scope="function")
def editor(qtbot, docstring_tooltip=False):
"""Helper function to set up the BECEditor widget."""
widget = BECEditor(toolbar_enabled=True, docstring_tooltip=docstring_tooltip)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def find_action_by_text(toolbar, text):
"""Helper function to find an action in the toolbar by its text."""
for action in toolbar.actions():
if action.text() == text:
return action
return None
def test_bec_editor_initialization(editor):
"""Test if the BECEditor widget is initialized correctly."""
assert isinstance(editor.editor, QsciScintilla)
assert isinstance(editor.terminal, QTextEdit)
assert isinstance(editor.auto_completer, AutoCompleter)
@patch("bec_widgets.widgets.editor.editor.Script") # Mock the Script class from jedi
def test_autocompleter_suggestions(mock_script, editor, qtbot):
"""Test if the autocompleter provides correct suggestions based on input."""
# Set up mock return values for the Script.complete method
mock_completion = MagicMock()
mock_completion.name = "mocked_method"
mock_script.return_value.complete.return_value = [mock_completion]
# Simulate user input in the editor
test_code = "print("
editor.editor.setText(test_code)
line, index = editor.editor.getCursorPosition()
# Trigger autocomplete
editor.auto_completer.get_completions(line, index, test_code)
# Use qtbot to wait for the completion thread
qtbot.waitUntil(lambda: editor.auto_completer.completions is not None, timeout=1000)
# Check if the expected completion is in the autocompleter's suggestions
suggested_methods = [completion.name for completion in editor.auto_completer.completions]
assert "mocked_method" in suggested_methods
@patch("bec_widgets.widgets.editor.editor.Script") # Mock the Script class from jedi
@pytest.mark.parametrize(
"docstring_enabled, expected_signature",
[(True, "Mocked signature with docstring"), (False, "Mocked signature")],
)
def test_autocompleter_signature(mock_script, editor, docstring_enabled, expected_signature):
"""Test if the autocompleter provides correct function signature based on docstring setting."""
# Set docstring mode based on parameter
editor.docstring_tooltip = docstring_enabled
editor.auto_completer.enable_docstring = docstring_enabled
# Set up mock return values for the Script.get_signatures method
mock_signature = MagicMock()
if docstring_enabled:
mock_signature.docstring.return_value = expected_signature
else:
mock_signature.to_string.return_value = expected_signature
mock_script.return_value.get_signatures.return_value = [mock_signature]
# Simulate user input that would trigger a signature request
test_code = "print("
editor.editor.setText(test_code)
line, index = editor.editor.getCursorPosition()
# Trigger signature request
signature = editor.auto_completer.get_function_signature(line, index, test_code)
# Check if the expected signature is returned
assert signature == expected_signature
def test_open_file(editor):
"""Test open_file method of BECEditor."""
# Create a temporary file with some content
with tempfile.NamedTemporaryFile(delete=False, suffix=".py") as temp_file:
temp_file.write(b"test file content")
# Mock user selecting the file in the dialog
with patch("qtpy.QtWidgets.QFileDialog.getOpenFileName", return_value=(temp_file.name, "")):
with patch("builtins.open", new_callable=mock_open, read_data="test file content"):
editor.open_file()
# Verify if the editor's text is set to the file content
assert editor.editor.text() == "test file content"
# Clean up by removing the temporary file
os.remove(temp_file.name)
def test_save_file(editor):
"""Test save_file method of BECEditor."""
# Set some text in the editor
editor.editor.setText("test save content")
# Mock user selecting the file in the dialog
with patch(
"qtpy.QtWidgets.QFileDialog.getSaveFileName", return_value=("/path/to/save/file.py", "")
):
with patch("builtins.open", new_callable=mock_open) as mock_file:
editor.save_file()
# Verify if the file was opened correctly for writing
mock_file.assert_called_with("/path/to/save/file.py", "w")
# Verify if the editor's text was written to the file
mock_file().write.assert_called_with("test save content")
def test_open_file_through_toolbar(editor):
"""Test the open_file method through the ModularToolBar."""
# Create a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".py") as temp_file:
temp_file.write(b"test file content")
# Find the open file action in the toolbar
open_action = find_action_by_text(editor.toolbar, "Open File")
assert open_action is not None, "Open File action should be found"
# Mock the file dialog and built-in open function
with patch("qtpy.QtWidgets.QFileDialog.getOpenFileName", return_value=(temp_file.name, "")):
with patch("builtins.open", new_callable=mock_open, read_data="test file content"):
open_action.trigger()
# Verify if the editor's text is set to the file content
assert editor.editor.text() == "test file content"
# Clean up
os.remove(temp_file.name)
def test_save_file_through_toolbar(editor):
"""Test the save_file method through the ModularToolBar."""
# Set some text in the editor
editor.editor.setText("test save content")
# Find the save file action in the toolbar
save_action = find_action_by_text(editor.toolbar, "Save File")
assert save_action is not None, "Save File action should be found"
# Mock the file dialog and built-in open function
with patch(
"qtpy.QtWidgets.QFileDialog.getSaveFileName", return_value=("/path/to/save/file.py", "")
):
with patch("builtins.open", new_callable=mock_open) as mock_file:
save_action.trigger()
# Verify if the file was opened correctly for writing
mock_file.assert_called_with("/path/to/save/file.py", "w")
# Verify if the editor's text was written to the file
mock_file().write.assert_called_with("test save content")

View File

@@ -0,0 +1,52 @@
from bec_widgets.cli.rpc_register import RPCRegister
class FakeObject:
def __init__(self, gui_id):
self.gui_id = gui_id
def test_add_connection(rpc_register):
obj1 = FakeObject("id1")
obj2 = FakeObject("id2")
rpc_register.add_rpc(obj1)
rpc_register.add_rpc(obj2)
all_connections = rpc_register.list_all_connections()
assert len(all_connections) == 2
assert all_connections["id1"] == obj1
assert all_connections["id2"] == obj2
def test_remove_connection(rpc_register):
obj1 = FakeObject("id1")
obj2 = FakeObject("id2")
rpc_register.add_rpc(obj1)
rpc_register.add_rpc(obj2)
rpc_register.remove_rpc(obj1)
all_connections = rpc_register.list_all_connections()
assert len(all_connections) == 1
assert all_connections["id2"] == obj2
def test_reset_singleton(rpc_register):
obj1 = FakeObject("id1")
obj2 = FakeObject("id2")
rpc_register.add_rpc(obj1)
rpc_register.add_rpc(obj2)
rpc_register.reset_singleton()
rpc_register = RPCRegister()
all_connections = rpc_register.list_all_connections()
assert len(all_connections) == 0
assert all_connections == {}