mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-13 19:21:50 +02:00
fix(cli/rpc): rpc client can return any type of object + config dict of the widgets
This commit is contained in:
@ -1,35 +1,35 @@
|
||||
import pytest
|
||||
import threading
|
||||
|
||||
from bec_lib.bec_service import BECService
|
||||
from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def threads_check():
|
||||
current_threads = set(
|
||||
th
|
||||
for th in threading.enumerate()
|
||||
if "loguru" not in th.name and th is not threading.main_thread()
|
||||
)
|
||||
yield
|
||||
threads_after = set(
|
||||
th
|
||||
for th in threading.enumerate()
|
||||
if "loguru" not in th.name and th is not threading.main_thread()
|
||||
)
|
||||
additional_threads = threads_after - current_threads
|
||||
assert (
|
||||
len(additional_threads) == 0
|
||||
), f"Test creates {len(additional_threads)} threads that are not cleaned: {additional_threads}"
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def bec_dispatcher(threads_check):
|
||||
bec_dispatcher = bec_dispatcher_module.BECDispatcher()
|
||||
yield bec_dispatcher
|
||||
bec_dispatcher.disconnect_all()
|
||||
# clean BEC client
|
||||
BECService.shutdown(bec_dispatcher.client)
|
||||
# reinitialize singleton for next test
|
||||
bec_dispatcher_module._bec_dispatcher = None
|
||||
# import pytest
|
||||
# import threading
|
||||
#
|
||||
# from bec_lib.bec_service import BECService
|
||||
# from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module
|
||||
#
|
||||
#
|
||||
# @pytest.fixture()
|
||||
# def threads_check():
|
||||
# current_threads = set(
|
||||
# th
|
||||
# for th in threading.enumerate()
|
||||
# if "loguru" not in th.name and th is not threading.main_thread()
|
||||
# )
|
||||
# yield
|
||||
# threads_after = set(
|
||||
# th
|
||||
# for th in threading.enumerate()
|
||||
# if "loguru" not in th.name and th is not threading.main_thread()
|
||||
# )
|
||||
# additional_threads = threads_after - current_threads
|
||||
# assert (
|
||||
# len(additional_threads) == 0
|
||||
# ), f"Test creates {len(additional_threads)} threads that are not cleaned: {additional_threads}"
|
||||
#
|
||||
#
|
||||
# @pytest.fixture(autouse=True)
|
||||
# def bec_dispatcher(threads_check):
|
||||
# bec_dispatcher = bec_dispatcher_module.BECDispatcher()
|
||||
# yield bec_dispatcher
|
||||
# bec_dispatcher.disconnect_all()
|
||||
# # clean BEC client
|
||||
# BECService.shutdown(bec_dispatcher.client)
|
||||
# # reinitialize singleton for next test
|
||||
# bec_dispatcher_module._bec_dispatcher = None
|
||||
|
Reference in New Issue
Block a user