0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 11:41:49 +02:00

refactor: emit content and metadata from messages in connect_slot

This commit is contained in:
2023-08-17 15:12:14 +02:00
parent 68be2a0418
commit 78b666ffdb

View File

@ -3,6 +3,7 @@ import itertools
import os import os
from dataclasses import dataclass from dataclasses import dataclass
from threading import RLock from threading import RLock
from typing import Callable
from bec_lib import BECClient from bec_lib import BECClient
from bec_lib.core import BECMessage, MessageEndpoints, ServiceConfig from bec_lib.core import BECMessage, MessageEndpoints, ServiceConfig
@ -57,10 +58,7 @@ class _BECDispatcher(QObject):
self.client.initialize(config=ServiceConfig(config_path=bec_config)) self.client.initialize(config=ServiceConfig(config_path=bec_config))
self._slot_signal_map = { self._slot_signal_map = {"on_scan_segment": self.scan_segment, "on_new_scan": self.new_scan}
"on_scan_segment": self.scan_segment,
"on_new_scan": self.new_scan,
}
self._daps = {} self._daps = {}
self._connections = {} self._connections = {}
@ -81,8 +79,7 @@ class _BECDispatcher(QObject):
scan_segment_topic = MessageEndpoints.scan_segment() scan_segment_topic = MessageEndpoints.scan_segment()
self._scan_segment_thread = self.client.connector.consumer( self._scan_segment_thread = self.client.connector.consumer(
topics=scan_segment_topic, topics=scan_segment_topic, cb=_scan_segment_cb
cb=_scan_segment_cb,
) )
self._scan_segment_thread.start() self._scan_segment_thread.start()
@ -92,12 +89,22 @@ class _BECDispatcher(QObject):
if callable(slot): if callable(slot):
signal.connect(slot) signal.connect(slot)
def connect_slot(self, slot, topic): def connect_slot(self, slot: Callable, topic: str) -> None:
"""Connect widget's pyqt slot, so that it is called on new pub/sub topic message
Args:
slot (Callable): A slot method/function that accepts two inputs: content and metadata of
the corresponding pub/sub message
topic (str): A topic that can typically be acquired via bec_lib.core.MessageEndpoints
"""
# create new connection for topic if it doesn't exist # create new connection for topic if it doesn't exist
if topic not in self._connections: if topic not in self._connections:
def cb(msg): def cb(msg):
msg = BECMessage.MessageReader.loads(msg.value) msg = BECMessage.MessageReader.loads(msg.value)
# TODO: this can could be replaced with a simple
# self._connections[topic].signal.emit(msg.content, msg.metadata)
# once all dispatcher.connect_slot calls are made with a single topic only
if not isinstance(msg, list): if not isinstance(msg, list):
msg = [msg] msg = [msg]
for msg_i in msg: for msg_i in msg:
@ -113,7 +120,14 @@ class _BECDispatcher(QObject):
self._connections[topic].signal.connect(slot) self._connections[topic].signal.connect(slot)
self._connections[topic].slots.add(slot) self._connections[topic].slots.add(slot)
def disconnect_slot(self, slot, topic): def disconnect_slot(self, slot: Callable, topic: str) -> None:
"""Disconnect widget's pyqt slot from pub/sub updates on a topic.
Args:
slot (Callable): A slot to be disconnected
topic (str): A corresponding topic that can typically be acquired via
bec_lib.core.MessageEndpoints
"""
if topic not in self._connections: if topic not in self._connections:
return return