diff --git a/bec_widgets/bec_dispatcher.py b/bec_widgets/bec_dispatcher.py index 96a9c937..1caa10fd 100644 --- a/bec_widgets/bec_dispatcher.py +++ b/bec_widgets/bec_dispatcher.py @@ -3,6 +3,7 @@ import itertools import os from dataclasses import dataclass from threading import RLock +from typing import Callable from bec_lib import BECClient from bec_lib.core import BECMessage, MessageEndpoints, ServiceConfig @@ -57,10 +58,7 @@ class _BECDispatcher(QObject): self.client.initialize(config=ServiceConfig(config_path=bec_config)) - self._slot_signal_map = { - "on_scan_segment": self.scan_segment, - "on_new_scan": self.new_scan, - } + self._slot_signal_map = {"on_scan_segment": self.scan_segment, "on_new_scan": self.new_scan} self._daps = {} self._connections = {} @@ -81,8 +79,7 @@ class _BECDispatcher(QObject): scan_segment_topic = MessageEndpoints.scan_segment() self._scan_segment_thread = self.client.connector.consumer( - topics=scan_segment_topic, - cb=_scan_segment_cb, + topics=scan_segment_topic, cb=_scan_segment_cb ) self._scan_segment_thread.start() @@ -92,12 +89,22 @@ class _BECDispatcher(QObject): if callable(slot): signal.connect(slot) - def connect_slot(self, slot, topic): + def connect_slot(self, slot: Callable, topic: str) -> None: + """Connect widget's pyqt slot, so that it is called on new pub/sub topic message + + Args: + slot (Callable): A slot method/function that accepts two inputs: content and metadata of + the corresponding pub/sub message + topic (str): A topic that can typically be acquired via bec_lib.core.MessageEndpoints + """ # create new connection for topic if it doesn't exist if topic not in self._connections: def cb(msg): msg = BECMessage.MessageReader.loads(msg.value) + # TODO: this can could be replaced with a simple + # self._connections[topic].signal.emit(msg.content, msg.metadata) + # once all dispatcher.connect_slot calls are made with a single topic only if not isinstance(msg, list): msg = [msg] for msg_i in msg: @@ -113,7 +120,14 @@ class _BECDispatcher(QObject): self._connections[topic].signal.connect(slot) self._connections[topic].slots.add(slot) - def disconnect_slot(self, slot, topic): + def disconnect_slot(self, slot: Callable, topic: str) -> None: + """Disconnect widget's pyqt slot from pub/sub updates on a topic. + + Args: + slot (Callable): A slot to be disconnected + topic (str): A corresponding topic that can typically be acquired via + bec_lib.core.MessageEndpoints + """ if topic not in self._connections: return