mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 03:31:50 +02:00
feat(utils/bec_dispatcher): BECDispatcher can register redis stream
This commit is contained in:
@ -69,36 +69,41 @@ class _BECDispatcher(QObject):
|
||||
if isinstance(topics, (str, EndpointInfo)):
|
||||
topics = [topics]
|
||||
|
||||
endpoints = [
|
||||
topic.endpoint if isinstance(topic, EndpointInfo) else topic for topic in topics
|
||||
]
|
||||
endpoint_to_consumer_type = {
|
||||
(topic.endpoint if isinstance(topic, EndpointInfo) else topic): (
|
||||
topic.message_op.name if isinstance(topic, EndpointInfo) else "SEND"
|
||||
)
|
||||
for topic in topics
|
||||
}
|
||||
|
||||
# consumer_types = [
|
||||
# topic.consumer_type if isinstance(topic, EndpointInfo) else "SET_PUBLISH"
|
||||
# for topic in topics
|
||||
# ]
|
||||
# Group topics by consumer type
|
||||
consumer_type_to_endpoints = {}
|
||||
for endpoint, consumer_type in endpoint_to_consumer_type.items():
|
||||
if consumer_type not in consumer_type_to_endpoints:
|
||||
consumer_type_to_endpoints[consumer_type] = []
|
||||
consumer_type_to_endpoints[consumer_type].append(endpoint)
|
||||
|
||||
# Ensure topics_key is a tuple, whether single_callback_for_all_topics is True or False.
|
||||
topics_key = (
|
||||
tuple(sorted(endpoints)) if single_callback_for_all_topics else tuple(endpoints)
|
||||
)
|
||||
for consumer_type, endpoints in consumer_type_to_endpoints.items():
|
||||
topics_key = (
|
||||
tuple(sorted(endpoints)) if single_callback_for_all_topics else tuple(endpoints)
|
||||
)
|
||||
|
||||
if topics_key not in self._connections:
|
||||
self._connections[topics_key] = self._create_connection(
|
||||
endpoints
|
||||
) # , consumer_type = )# add here consumer type
|
||||
connection = self._connections[topics_key]
|
||||
if slot not in connection.slots:
|
||||
connection.signal.connect(slot)
|
||||
connection.slots.add(slot)
|
||||
if topics_key not in self._connections:
|
||||
self._connections[topics_key] = self._create_connection(endpoints, consumer_type)
|
||||
connection = self._connections[topics_key]
|
||||
|
||||
def _create_connection(
|
||||
self, topics: list
|
||||
) -> _Connection: # , consumer_type: str) -> _Connection:
|
||||
if slot not in connection.slots:
|
||||
connection.signal.connect(slot)
|
||||
connection.slots.add(slot)
|
||||
|
||||
def _create_connection(self, topics: list, consumer_type: str) -> _Connection:
|
||||
"""Creates a new connection for given topics."""
|
||||
|
||||
def cb(msg):
|
||||
msg = msg.value
|
||||
if isinstance(msg, dict):
|
||||
msg = msg["data"]
|
||||
else:
|
||||
msg = msg.value
|
||||
for connection_key, connection in self._connections.items():
|
||||
if set(topics).intersection(connection_key):
|
||||
if isinstance(msg, list):
|
||||
@ -106,11 +111,10 @@ class _BECDispatcher(QObject):
|
||||
connection.signal.emit(msg.content, msg.metadata)
|
||||
|
||||
try:
|
||||
self.client.connector.register(topics=topics, cb=cb)
|
||||
# if consumer_type == "SET_PUBLISH":
|
||||
# self.client.connector.register(topics=topics, cb=cb)
|
||||
# elif consumer_type == "STREAM":
|
||||
# self.client.connector.register_stream(topics=topics, cb=cb, newest_only=True)
|
||||
if consumer_type == "STREAM":
|
||||
self.client.connector.register_stream(topics=topics, cb=cb, newest_only=True)
|
||||
else:
|
||||
self.client.connector.register(topics=topics, cb=cb)
|
||||
except redis.exceptions.ConnectionError:
|
||||
print("Could not connect to Redis, skipping registration of topics.")
|
||||
|
||||
@ -155,8 +159,6 @@ class _BECDispatcher(QObject):
|
||||
endpoints = [
|
||||
topic.endpoint if isinstance(topic, EndpointInfo) else topic for topic in topics
|
||||
]
|
||||
# if isinstance(topics, str):
|
||||
# topics = [topics]
|
||||
|
||||
for key, connection in list(self._connections.items()):
|
||||
if slot in connection.slots:
|
||||
|
Reference in New Issue
Block a user