mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 03:31:50 +02:00
fix(bec_dispatcher): adapt code to redis connector refactoring
This commit is contained in:
@ -7,7 +7,7 @@ from collections.abc import Callable
|
||||
from typing import Union
|
||||
|
||||
from bec_lib import BECClient, ServiceConfig
|
||||
from bec_lib.redis_connector import RedisConsumerThreaded
|
||||
from bec_lib.connector import ConnectorBase
|
||||
from qtpy.QtCore import QObject, Signal as pyqtSignal
|
||||
|
||||
# Adding a new pyqt signal requires a class factory, as they must be part of the class definition
|
||||
@ -18,10 +18,11 @@ _signal_class_factory = (
|
||||
|
||||
|
||||
class _Connection:
|
||||
"""Utility class to keep track of slots connected to a particular redis consumer"""
|
||||
"""Utility class to keep track of slots connected to a particular redis connector"""
|
||||
|
||||
def __init__(self, callback) -> None:
|
||||
self.callback = callback
|
||||
|
||||
def __init__(self, consumer) -> None:
|
||||
self.consumer: RedisConsumerThreaded = consumer
|
||||
self.slots = set()
|
||||
# keep a reference to a new signal class, so it is not gc'ed
|
||||
self._signal_container = next(_signal_class_factory)()
|
||||
@ -29,7 +30,7 @@ class _Connection:
|
||||
|
||||
|
||||
class _BECDispatcher(QObject):
|
||||
"""Utility class to keep track of slots connected to a particular redis consumer"""
|
||||
"""Utility class to keep track of slots connected to a particular redis connector"""
|
||||
|
||||
def __init__(self, bec_config=None):
|
||||
super().__init__()
|
||||
@ -73,18 +74,13 @@ class _BECDispatcher(QObject):
|
||||
|
||||
def cb(msg):
|
||||
msg = msg.value
|
||||
if not isinstance(msg, list):
|
||||
msg = [msg]
|
||||
for msg_i in msg:
|
||||
for connection_key, connection in self._connections.items():
|
||||
if set(topics).intersection(
|
||||
connection_key if isinstance(connection_key, tuple) else [connection_key]
|
||||
):
|
||||
connection.signal.emit(msg_i.content, msg_i.metadata)
|
||||
for connection_key, connection in self._connections.items():
|
||||
if set(topics).intersection(connection_key):
|
||||
connection.signal.emit(msg.content, msg.metadata)
|
||||
|
||||
consumer = self.client.connector.consumer(topics=topics, cb=cb)
|
||||
consumer.start()
|
||||
return _Connection(consumer)
|
||||
self.client.connector.register(topics=topics, cb=cb)
|
||||
|
||||
return _Connection(cb)
|
||||
|
||||
def _do_disconnect_slot(self, topic, slot):
|
||||
print(f"Disconnecting {slot} from {topic}")
|
||||
@ -96,9 +92,6 @@ class _BECDispatcher(QObject):
|
||||
print("Continue to remove slot:'{slot}' from 'connection.slots'.")
|
||||
connection.slots.remove(slot)
|
||||
if not connection.slots:
|
||||
print(f"{connection.consumer} is shutting down")
|
||||
connection.consumer.shutdown()
|
||||
connection.consumer.join()
|
||||
del self._connections[topic]
|
||||
|
||||
def _disconnect_slot_from_topic(self, slot: Callable, topic: str) -> None:
|
||||
|
@ -6,25 +6,24 @@ from bec_lib.messages import ScanMessage
|
||||
from bec_lib.connector import MessageObject
|
||||
|
||||
|
||||
msg = MessageObject(topic="", value=ScanMessage(point_id=0, scanID=0, data={}).dumps())
|
||||
msg = MessageObject(topic="", value=ScanMessage(point_id=0, scanID=0, data={}))
|
||||
|
||||
|
||||
@pytest.fixture(name="consumer")
|
||||
def _consumer(bec_dispatcher):
|
||||
bec_dispatcher.client.connector.consumer = Mock()
|
||||
consumer = bec_dispatcher.client.connector.consumer
|
||||
yield consumer
|
||||
bec_dispatcher.client.connector = Mock()
|
||||
yield bec_dispatcher.client.connector
|
||||
|
||||
|
||||
@pytest.mark.filterwarnings("ignore:Failed to connect to redis.")
|
||||
def test_connect_one_slot(bec_dispatcher, consumer):
|
||||
slot1 = Mock()
|
||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
|
||||
consumer.assert_called_once()
|
||||
consumer.register.assert_called_once()
|
||||
# trigger consumer callback as if a message was published
|
||||
consumer.call_args.kwargs["cb"](msg)
|
||||
consumer.register.call_args.kwargs["cb"](msg)
|
||||
slot1.assert_called_once()
|
||||
consumer.call_args.kwargs["cb"](msg)
|
||||
consumer.register.call_args.kwargs["cb"](msg)
|
||||
assert slot1.call_count == 2
|
||||
|
||||
|
||||
@ -32,23 +31,23 @@ def test_connect_identical(bec_dispatcher, consumer):
|
||||
slot1 = Mock()
|
||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
|
||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
|
||||
consumer.assert_called_once()
|
||||
consumer.register.assert_called_once()
|
||||
|
||||
consumer.call_args.kwargs["cb"](msg)
|
||||
consumer.register.call_args.kwargs["cb"](msg)
|
||||
slot1.assert_called_once()
|
||||
|
||||
|
||||
def test_connect_many_slots_one_topic(bec_dispatcher, consumer):
|
||||
slot1, slot2 = Mock(), Mock()
|
||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
|
||||
consumer.assert_called_once()
|
||||
consumer.register.assert_called_once()
|
||||
bec_dispatcher.connect_slot(slot=slot2, topics="topic0")
|
||||
consumer.assert_called_once()
|
||||
consumer.register.assert_called_once()
|
||||
# trigger consumer callback as if a message was published
|
||||
consumer.call_args.kwargs["cb"](msg)
|
||||
consumer.register.call_args.kwargs["cb"](msg)
|
||||
slot1.assert_called_once()
|
||||
slot2.assert_called_once()
|
||||
consumer.call_args.kwargs["cb"](msg)
|
||||
consumer.register.call_args.kwargs["cb"](msg)
|
||||
assert slot1.call_count == 2
|
||||
assert slot2.call_count == 2
|
||||
|
||||
@ -56,13 +55,13 @@ def test_connect_many_slots_one_topic(bec_dispatcher, consumer):
|
||||
def test_connect_one_slot_many_topics(bec_dispatcher, consumer):
|
||||
slot1 = Mock()
|
||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
|
||||
assert consumer.call_count == 1
|
||||
assert consumer.register.call_count == 1
|
||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic1")
|
||||
assert consumer.call_count == 2
|
||||
assert consumer.register.call_count == 2
|
||||
# trigger consumer callback as if a message was published
|
||||
consumer.call_args_list[0].kwargs["cb"](msg)
|
||||
consumer.register.call_args_list[0].kwargs["cb"](msg)
|
||||
slot1.assert_called_once()
|
||||
consumer.call_args_list[1].kwargs["cb"](msg)
|
||||
consumer.register.call_args_list[1].kwargs["cb"](msg)
|
||||
assert slot1.call_count == 2
|
||||
|
||||
|
||||
@ -72,19 +71,19 @@ def test_disconnect_one_slot_one_topic(bec_dispatcher, consumer):
|
||||
|
||||
# disconnect using a different topic
|
||||
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic1")
|
||||
consumer.call_args.kwargs["cb"](msg)
|
||||
consumer.register.call_args.kwargs["cb"](msg)
|
||||
assert slot1.call_count == 1
|
||||
|
||||
# disconnect using a different slot
|
||||
bec_dispatcher.disconnect_slot(slot=slot2, topics="topic0")
|
||||
consumer.call_args.kwargs["cb"](msg)
|
||||
consumer.register.call_args.kwargs["cb"](msg)
|
||||
assert slot1.call_count == 2
|
||||
|
||||
# disconnect using the right slot and topics
|
||||
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic0")
|
||||
# reset count to 0 for slot
|
||||
# reset count to for slot
|
||||
slot1.reset_mock()
|
||||
consumer.call_args.kwargs["cb"](msg)
|
||||
consumer.register.call_args.kwargs["cb"](msg)
|
||||
assert slot1.call_count == 0
|
||||
|
||||
|
||||
@ -95,14 +94,14 @@ def test_disconnect_identical(bec_dispatcher, consumer):
|
||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
|
||||
|
||||
# Test to call the slot once (slot should be not connected twice)
|
||||
consumer.call_args.kwargs["cb"](msg)
|
||||
consumer.register.call_args.kwargs["cb"](msg)
|
||||
assert slot1.call_count == 1
|
||||
|
||||
# Disconnect the slot
|
||||
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic0")
|
||||
|
||||
# Test to call the slot once (slot should be not connected anymore), count remains 1
|
||||
consumer.call_args.kwargs["cb"](msg)
|
||||
consumer.register.call_args.kwargs["cb"](msg)
|
||||
assert slot1.call_count == 1
|
||||
|
||||
|
||||
@ -113,19 +112,19 @@ def test_disconnect_many_slots_one_topic(bec_dispatcher, consumer):
|
||||
|
||||
# disconnect using a different slot
|
||||
bec_dispatcher.disconnect_slot(slot3, topics="topic0")
|
||||
consumer.call_args.kwargs["cb"](msg)
|
||||
consumer.register.call_args.kwargs["cb"](msg)
|
||||
assert slot1.call_count == 1
|
||||
assert slot2.call_count == 1
|
||||
|
||||
# disconnect using a different topics
|
||||
bec_dispatcher.disconnect_slot(slot1, topics="topic1")
|
||||
consumer.call_args.kwargs["cb"](msg)
|
||||
consumer.register.call_args.kwargs["cb"](msg)
|
||||
assert slot1.call_count == 2
|
||||
assert slot2.call_count == 2
|
||||
|
||||
# disconnect using the right slot and topics
|
||||
bec_dispatcher.disconnect_slot(slot1, topics="topic0")
|
||||
consumer.call_args.kwargs["cb"](msg)
|
||||
consumer.register.call_args.kwargs["cb"](msg)
|
||||
assert slot1.call_count == 2
|
||||
assert slot2.call_count == 3
|
||||
|
||||
@ -137,31 +136,31 @@ def test_disconnect_one_slot_many_topics(bec_dispatcher, consumer):
|
||||
|
||||
# disconnect using a different slot
|
||||
bec_dispatcher.disconnect_slot(slot=slot2, topics="topic0")
|
||||
consumer.call_args_list[0].kwargs["cb"](msg)
|
||||
consumer.register.call_args_list[0].kwargs["cb"](msg)
|
||||
assert slot1.call_count == 1
|
||||
consumer.call_args_list[1].kwargs["cb"](msg)
|
||||
consumer.register.call_args_list[1].kwargs["cb"](msg)
|
||||
assert slot1.call_count == 2
|
||||
|
||||
# disconnect using a different topics
|
||||
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic3")
|
||||
consumer.call_args_list[0].kwargs["cb"](msg)
|
||||
consumer.register.call_args_list[0].kwargs["cb"](msg)
|
||||
assert slot1.call_count == 3
|
||||
consumer.call_args_list[1].kwargs["cb"](msg)
|
||||
consumer.register.call_args_list[1].kwargs["cb"](msg)
|
||||
assert slot1.call_count == 4
|
||||
|
||||
# disconnect using the right slot and topics
|
||||
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic0")
|
||||
# Calling disconnected topic0 should not call slot1
|
||||
consumer.call_args_list[0].kwargs["cb"](msg)
|
||||
consumer.register.call_args_list[0].kwargs["cb"](msg)
|
||||
assert slot1.call_count == 4
|
||||
# Calling topic1 should still call slot1
|
||||
consumer.call_args_list[1].kwargs["cb"](msg)
|
||||
consumer.register.call_args_list[1].kwargs["cb"](msg)
|
||||
assert slot1.call_count == 5
|
||||
|
||||
# disconnect remaining topic1 from slot1, calling any topic should not increase count
|
||||
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic1")
|
||||
consumer.call_args_list[0].kwargs["cb"](msg)
|
||||
consumer.call_args_list[1].kwargs["cb"](msg)
|
||||
consumer.register.call_args_list[0].kwargs["cb"](msg)
|
||||
consumer.register.call_args_list[1].kwargs["cb"](msg)
|
||||
assert slot1.call_count == 5
|
||||
|
||||
|
||||
@ -178,9 +177,9 @@ def test_disconnect_all(bec_dispatcher, consumer):
|
||||
bec_dispatcher.disconnect_all()
|
||||
|
||||
# Simulate messages and verify that none of the slots are called
|
||||
consumer.call_args_list[0].kwargs["cb"](msg)
|
||||
consumer.call_args_list[1].kwargs["cb"](msg)
|
||||
consumer.call_args_list[2].kwargs["cb"](msg)
|
||||
consumer.register.call_args_list[0].kwargs["cb"](msg)
|
||||
consumer.register.call_args_list[1].kwargs["cb"](msg)
|
||||
consumer.register.call_args_list[2].kwargs["cb"](msg)
|
||||
|
||||
# Ensure that the slots have not been called
|
||||
assert slot1.call_count == 0
|
||||
@ -209,13 +208,13 @@ def test_connect_one_slot_multiple_topics_single_callback(bec_dispatcher, consum
|
||||
msg_with_topic = MessageObject(
|
||||
topic=topic, value=ScanMessage(point_id=0, scanID=0, data={}).dumps()
|
||||
)
|
||||
consumer.call_args.kwargs["cb"](msg_with_topic)
|
||||
consumer.register.call_args.kwargs["cb"](msg_with_topic)
|
||||
|
||||
# Verify that the slot is called once for each topic
|
||||
assert slot1.call_count == len(topics)
|
||||
|
||||
# Verify that a single consumer is created for all topics
|
||||
consumer.assert_called_once()
|
||||
consumer.register.assert_called_once()
|
||||
|
||||
|
||||
def test_disconnect_all_with_single_callback_for_multiple_topics(bec_dispatcher, consumer):
|
||||
@ -237,5 +236,5 @@ def test_disconnect_all_with_single_callback_for_multiple_topics(bec_dispatcher,
|
||||
assert slot1.call_count == 0 # Slot has not been called
|
||||
|
||||
# Simulate messages and verify that the slot is not called
|
||||
consumer.call_args.kwargs["cb"](msg)
|
||||
consumer.register.call_args.kwargs["cb"](msg)
|
||||
assert slot1.call_count == 0 # Slot has not been called
|
||||
|
@ -80,7 +80,7 @@ def mocked_client():
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def monitor(qtbot, mocked_client):
|
||||
def monitor(bec_dispatcher, qtbot, mocked_client):
|
||||
# client = MagicMock()
|
||||
widget = BECMonitor(client=mocked_client)
|
||||
qtbot.addWidget(widget)
|
||||
|
Reference in New Issue
Block a user