mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 11:41:49 +02:00
feat(connector): added threadpool wrapper
This commit is contained in:
@ -1,5 +1,9 @@
|
||||
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from qtpy.QtCore import Slot
|
||||
from qtpy.QtWidgets import QApplication
|
||||
|
||||
from bec_widgets.utils import BECConnector, ConnectionConfig
|
||||
|
||||
@ -55,3 +59,22 @@ def test_bec_connector_update_client(bec_connector, mocked_client):
|
||||
def test_bec_connector_get_config(bec_connector):
|
||||
assert bec_connector.get_config(dict_output=False) == bec_connector.config
|
||||
assert bec_connector.get_config() == bec_connector.config.model_dump()
|
||||
|
||||
|
||||
def test_bec_connector_submit_task(bec_connector):
|
||||
def test_func():
|
||||
time.sleep(2)
|
||||
print("done")
|
||||
|
||||
completed = False
|
||||
|
||||
@Slot()
|
||||
def complete_func():
|
||||
nonlocal completed
|
||||
completed = True
|
||||
|
||||
bec_connector.submit_task(test_func, on_complete=complete_func)
|
||||
assert not completed
|
||||
while not completed:
|
||||
QApplication.processEvents()
|
||||
time.sleep(0.1)
|
||||
|
Reference in New Issue
Block a user