diff --git a/bec_widgets/utils/bec_connector.py b/bec_widgets/utils/bec_connector.py index 0eae5f57..97aba79d 100644 --- a/bec_widgets/utils/bec_connector.py +++ b/bec_widgets/utils/bec_connector.py @@ -2,10 +2,11 @@ from __future__ import annotations import time -from typing import Optional, Type +from typing import Optional -from bec_lib.utils.import_utils import lazy_import, lazy_import_from +from bec_lib.utils.import_utils import lazy_import_from from pydantic import BaseModel, Field, field_validator +from qtpy.QtCore import QObject, QRunnable, QThreadPool, Signal from qtpy.QtCore import Slot as pyqtSlot from bec_widgets.cli.rpc_register import RPCRegister @@ -33,6 +34,31 @@ class ConnectionConfig(BaseModel): return v +class WorkerSignals(QObject): + progress = Signal(dict) + completed = Signal() + + +class Worker(QRunnable): + """ + Worker class to run a function in a separate thread. + """ + + def __init__(self, func, *args, **kwargs): + super().__init__() + self.signals = WorkerSignals() + self.func = func + self.args = args + self.kwargs = kwargs + + def run(self): + """ + Run the specified function in the thread. + """ + self.func(*self.args, **self.kwargs) + self.signals.completed.emit() + + class BECConnector: """Connection mixin class for all BEC widgets, to handle BEC client and device manager""" @@ -63,6 +89,43 @@ class BECConnector: self.rpc_register = RPCRegister() self.rpc_register.add_rpc(self) + self._thread_pool = QThreadPool.globalInstance() + + def submit_task(self, fn, *args, on_complete: pyqtSlot = None, **kwargs) -> Worker: + """ + Submit a task to run in a separate thread. The task will run the specified + function with the provided arguments and emit the completed signal when done. + + Use this method if you want to wait for a task to complete without blocking the + main thread. + + Args: + fn: Function to run in a separate thread. + *args: Arguments for the function. + on_complete: Slot to run when the task is complete. + **kwargs: Keyword arguments for the function. + + Returns: + worker: The worker object that will run the task. + + Examples: + >>> def my_function(a, b): + >>> print(a + b) + >>> self.submit_task(my_function, 1, 2) + + >>> def my_function(a, b): + >>> print(a + b) + >>> def on_complete(): + >>> print("Task complete") + >>> self.submit_task(my_function, 1, 2, on_complete=on_complete) + + """ + worker = Worker(fn, *args, **kwargs) + if on_complete: + worker.signals.completed.connect(on_complete) + self._thread_pool.start(worker) + return worker + def get_all_rpc(self) -> dict: """Get all registered RPC objects.""" all_connections = self.rpc_register.list_all_connections() diff --git a/tests/unit_tests/test_bec_connector.py b/tests/unit_tests/test_bec_connector.py index 888f0427..a8446886 100644 --- a/tests/unit_tests/test_bec_connector.py +++ b/tests/unit_tests/test_bec_connector.py @@ -1,5 +1,9 @@ # pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring +import time + import pytest +from qtpy.QtCore import Slot +from qtpy.QtWidgets import QApplication from bec_widgets.utils import BECConnector, ConnectionConfig @@ -55,3 +59,22 @@ def test_bec_connector_update_client(bec_connector, mocked_client): def test_bec_connector_get_config(bec_connector): assert bec_connector.get_config(dict_output=False) == bec_connector.config assert bec_connector.get_config() == bec_connector.config.model_dump() + + +def test_bec_connector_submit_task(bec_connector): + def test_func(): + time.sleep(2) + print("done") + + completed = False + + @Slot() + def complete_func(): + nonlocal completed + completed = True + + bec_connector.submit_task(test_func, on_complete=complete_func) + assert not completed + while not completed: + QApplication.processEvents() + time.sleep(0.1)