1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-03-04 16:02:51 +01:00

refactor(bec_connector): replace pyqtSlot with SafeSlot for consistency

This commit is contained in:
2025-04-23 14:50:28 +02:00
parent 6318b2d822
commit 9d6d0b406a

View File

@@ -14,8 +14,7 @@ from qtpy.QtCore import QObject, QRunnable, QThreadPool, QTimer, Signal
from qtpy.QtWidgets import QApplication
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.error_popups import ErrorPopupUtility
from bec_widgets.utils.error_popups import SafeSlot as pyqtSlot
from bec_widgets.utils.error_popups import ErrorPopupUtility, SafeSlot
from bec_widgets.utils.widget_io import WidgetHierarchy
from bec_widgets.utils.yaml_dialog import load_yaml, load_yaml_gui, save_yaml, save_yaml_gui
@@ -108,7 +107,7 @@ class BECConnector:
if not self.client in BECConnector.EXIT_HANDLERS:
# register function to clean connections at exit;
# the function depends on BECClient, and BECDispatcher
@pyqtSlot()
@SafeSlot()
def terminate(client=self.client, dispatcher=self.bec_dispatcher):
logger.info("Disconnecting", repr(dispatcher))
dispatcher.disconnect_all()
@@ -231,7 +230,7 @@ class BECConnector:
if self.rpc_register.object_is_registered(self):
self.rpc_register.broadcast()
def submit_task(self, fn, *args, on_complete: pyqtSlot = None, **kwargs) -> Worker:
def submit_task(self, fn, *args, on_complete: SafeSlot = None, **kwargs) -> Worker:
"""
Submit a task to run in a separate thread. The task will run the specified
function with the provided arguments and emit the completed signal when done.
@@ -359,7 +358,7 @@ class BECConnector:
file_path = os.path.join(path, f"{self.__class__.__name__}_config.yaml")
save_yaml(file_path, self._config_dict)
# @pyqtSlot(str)
# @SafeSlot(str)
def _set_gui_id(self, gui_id: str) -> None:
"""
Set the GUI ID for the widget.
@@ -391,7 +390,7 @@ class BECConnector:
self.client = client
self.get_bec_shortcuts()
@pyqtSlot(ConnectionConfig) # TODO can be also dict
@SafeSlot(ConnectionConfig) # TODO can be also dict
def on_config_update(self, config: ConnectionConfig | dict) -> None:
"""
Update the configuration for the widget.