mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 11:41:49 +02:00
refactor(utils/bec_dispatcher): new singleton definition
This commit is contained in:
@ -31,10 +31,22 @@ class _Connection:
|
||||
self.signal: pyqtSignal = self._signal_container.signal
|
||||
|
||||
|
||||
class _BECDispatcher(QObject):
|
||||
class BECDispatcher(QObject):
|
||||
"""Utility class to keep track of slots connected to a particular redis connector"""
|
||||
|
||||
_instance = None
|
||||
_initialized = False
|
||||
|
||||
def __new__(cls, client=None, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super(BECDispatcher, cls).__new__(cls)
|
||||
cls._initialized = False
|
||||
return cls._instance
|
||||
|
||||
def __init__(self, client=None):
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
super().__init__()
|
||||
self.client = BECClient() if client is None else client
|
||||
try:
|
||||
@ -44,6 +56,13 @@ class _BECDispatcher(QObject):
|
||||
|
||||
self._connections = {}
|
||||
|
||||
self._initialized = True
|
||||
|
||||
@classmethod
|
||||
def reset_singleton(cls):
|
||||
cls._instance = None
|
||||
cls._initialized = False
|
||||
|
||||
def connect_slot(
|
||||
self,
|
||||
slot: Callable,
|
||||
@ -170,18 +189,3 @@ class _BECDispatcher(QObject):
|
||||
for key, connection in list(self._connections.items()):
|
||||
for slot in list(connection.slots):
|
||||
self._disconnect_slot_from_topic(slot, key)
|
||||
|
||||
|
||||
# variable holding the Singleton instance of BECDispatcher
|
||||
_bec_dispatcher = None
|
||||
|
||||
|
||||
def BECDispatcher():
|
||||
global _bec_dispatcher
|
||||
if _bec_dispatcher is None:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--bec-client", default=None)
|
||||
args, _ = parser.parse_known_args()
|
||||
|
||||
_bec_dispatcher = _BECDispatcher(args.bec_client)
|
||||
return _bec_dispatcher
|
||||
|
@ -33,4 +33,4 @@ def bec_dispatcher(threads_check):
|
||||
# clean BEC client
|
||||
bec_dispatcher.client.shutdown()
|
||||
# reinitialize singleton for next test
|
||||
bec_dispatcher_module._bec_dispatcher = None
|
||||
bec_dispatcher_module.BECDispatcher.reset_singleton()
|
||||
|
Reference in New Issue
Block a user