mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 03:31:50 +02:00
fix: bec_dispatcher.py can connect multiple topics to one callback slot
This commit is contained in:
@ -1,3 +1,4 @@
|
||||
# pylint: disable=missing-function-docstring
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
@ -189,3 +190,52 @@ def test_disconnect_all(bec_dispatcher, consumer):
|
||||
assert "topic0" not in bec_dispatcher._connections
|
||||
assert "topic1" not in bec_dispatcher._connections
|
||||
assert "topic2" not in bec_dispatcher._connections
|
||||
|
||||
|
||||
def test_connect_one_slot_multiple_topics_single_callback(bec_dispatcher, consumer):
|
||||
slot1 = Mock()
|
||||
|
||||
# Connect the slot to multiple topics using a single callback
|
||||
topics = ["topic1", "topic2"]
|
||||
bec_dispatcher.connect_slot(slot=slot1, topics=topics, single_callback_for_all_topics=True)
|
||||
|
||||
# Verify the initial state
|
||||
assert len(bec_dispatcher._connections) == 1 # One connection for all topics
|
||||
assert len(bec_dispatcher._connections[tuple(sorted(topics))].slots) == 1 # One slot connected
|
||||
|
||||
# Simulate messages being published on each topic
|
||||
for topic in topics:
|
||||
msg_with_topic = MessageObject(
|
||||
topic=topic, value=ScanMessage(point_id=0, scanID=0, data={}).dumps()
|
||||
)
|
||||
consumer.call_args.kwargs["cb"](msg_with_topic)
|
||||
|
||||
# Verify that the slot is called once for each topic
|
||||
assert slot1.call_count == len(topics)
|
||||
|
||||
# Verify that a single consumer is created for all topics
|
||||
consumer.assert_called_once()
|
||||
|
||||
|
||||
def test_disconnect_all_with_single_callback_for_multiple_topics(bec_dispatcher, consumer):
|
||||
slot1 = Mock()
|
||||
|
||||
# Connect the slot to multiple topics using a single callback
|
||||
topics = ["topic1", "topic2"]
|
||||
bec_dispatcher.connect_slot(slot=slot1, topics=topics, single_callback_for_all_topics=True)
|
||||
|
||||
# Verify the initial state
|
||||
assert len(bec_dispatcher._connections) == 1 # One connection for all topics
|
||||
assert len(bec_dispatcher._connections[tuple(sorted(topics))].slots) == 1 # One slot connected
|
||||
|
||||
# Call disconnect_all method
|
||||
bec_dispatcher.disconnect_all()
|
||||
|
||||
# Verify that the slot is disconnected
|
||||
assert len(bec_dispatcher._connections) == 0 # All connections are removed
|
||||
assert slot1.call_count == 0 # Slot has not been called
|
||||
|
||||
# Simulate messages and verify that the slot is not called
|
||||
msg = MessageObject(topic="topic1", value=ScanMessage(point_id=0, scanID=0, data={}).dumps())
|
||||
with pytest.raises(KeyError):
|
||||
consumer.call_args.kwargs["cb"](msg)
|
||||
|
Reference in New Issue
Block a user