mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 11:41:49 +02:00
test(utils/bec_dispatcher): tests fixed
This commit is contained in:
@ -1,239 +1,58 @@
|
|||||||
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
|
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
|
||||||
from unittest.mock import Mock
|
import time
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
import redis
|
||||||
from bec_lib.connector import MessageObject
|
from bec_lib.connector import MessageObject
|
||||||
from bec_lib.messages import ScanMessage
|
from bec_lib.messages import ScanMessage
|
||||||
|
from bec_lib.redis_connector import RedisConnector
|
||||||
msg = MessageObject(topic="", value=ScanMessage(point_id=0, scan_id="scan_id", data={}))
|
from bec_lib.serialization import MsgpackSerialization
|
||||||
|
from bec_widgets.utils.bec_dispatcher import QtRedisConnector
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(name="consumer")
|
@pytest.fixture
|
||||||
def _consumer(bec_dispatcher):
|
def bec_dispatcher_w_connector(bec_dispatcher, topics_msg_list):
|
||||||
bec_dispatcher.client.connector = Mock()
|
def pubsub_msg_generator():
|
||||||
yield bec_dispatcher.client.connector
|
for topic, msg in topics_msg_list:
|
||||||
|
yield {"channel": topic.encode(), "pattern": None, "data": msg}
|
||||||
|
while True:
|
||||||
|
time.sleep(0.2)
|
||||||
|
yield StopIteration
|
||||||
|
|
||||||
|
with mock.patch("redis.Redis"):
|
||||||
|
pubsub = redis.Redis().pubsub()
|
||||||
|
messages = pubsub_msg_generator()
|
||||||
|
pubsub.get_message.side_effect = lambda timeout: next(messages)
|
||||||
|
connector = QtRedisConnector("localhost:1")
|
||||||
|
bec_dispatcher.client.connector = connector
|
||||||
|
yield bec_dispatcher
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.filterwarnings("ignore:Failed to connect to redis.")
|
dummy_msg = MsgpackSerialization.dumps(ScanMessage(point_id=0, scan_id=0, data={}))
|
||||||
def test_connect_one_slot(bec_dispatcher, consumer):
|
|
||||||
slot1 = Mock()
|
|
||||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
|
|
||||||
consumer.register.assert_called_once()
|
|
||||||
# trigger consumer callback as if a message was published
|
|
||||||
consumer.register.call_args.kwargs["cb"](msg)
|
|
||||||
slot1.assert_called_once()
|
|
||||||
consumer.register.call_args.kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 2
|
|
||||||
|
|
||||||
|
|
||||||
def test_connect_identical(bec_dispatcher, consumer):
|
@pytest.mark.parametrize(
|
||||||
slot1 = Mock()
|
"topics_msg_list",
|
||||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
|
[
|
||||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
|
(
|
||||||
consumer.register.assert_called_once()
|
("topic1", dummy_msg),
|
||||||
|
("topic2", dummy_msg),
|
||||||
consumer.register.call_args.kwargs["cb"](msg)
|
("topic3", dummy_msg),
|
||||||
slot1.assert_called_once()
|
|
||||||
|
|
||||||
|
|
||||||
def test_connect_many_slots_one_topic(bec_dispatcher, consumer):
|
|
||||||
slot1, slot2 = Mock(), Mock()
|
|
||||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
|
|
||||||
consumer.register.assert_called_once()
|
|
||||||
bec_dispatcher.connect_slot(slot=slot2, topics="topic0")
|
|
||||||
consumer.register.assert_called_once()
|
|
||||||
# trigger consumer callback as if a message was published
|
|
||||||
consumer.register.call_args.kwargs["cb"](msg)
|
|
||||||
slot1.assert_called_once()
|
|
||||||
slot2.assert_called_once()
|
|
||||||
consumer.register.call_args.kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 2
|
|
||||||
assert slot2.call_count == 2
|
|
||||||
|
|
||||||
|
|
||||||
def test_connect_one_slot_many_topics(bec_dispatcher, consumer):
|
|
||||||
slot1 = Mock()
|
|
||||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
|
|
||||||
assert consumer.register.call_count == 1
|
|
||||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic1")
|
|
||||||
assert consumer.register.call_count == 2
|
|
||||||
# trigger consumer callback as if a message was published
|
|
||||||
consumer.register.call_args_list[0].kwargs["cb"](msg)
|
|
||||||
slot1.assert_called_once()
|
|
||||||
consumer.register.call_args_list[1].kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 2
|
|
||||||
|
|
||||||
|
|
||||||
def test_disconnect_one_slot_one_topic(bec_dispatcher, consumer):
|
|
||||||
slot1, slot2 = Mock(), Mock()
|
|
||||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
|
|
||||||
|
|
||||||
# disconnect using a different topic
|
|
||||||
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic1")
|
|
||||||
consumer.register.call_args.kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 1
|
|
||||||
|
|
||||||
# disconnect using a different slot
|
|
||||||
bec_dispatcher.disconnect_slot(slot=slot2, topics="topic0")
|
|
||||||
consumer.register.call_args.kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 2
|
|
||||||
|
|
||||||
# disconnect using the right slot and topics
|
|
||||||
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic0")
|
|
||||||
# reset count to for slot
|
|
||||||
slot1.reset_mock()
|
|
||||||
consumer.register.call_args.kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 0
|
|
||||||
|
|
||||||
|
|
||||||
def test_disconnect_identical(bec_dispatcher, consumer):
|
|
||||||
slot1 = Mock()
|
|
||||||
# Try to connect slot twice
|
|
||||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
|
|
||||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
|
|
||||||
|
|
||||||
# Test to call the slot once (slot should be not connected twice)
|
|
||||||
consumer.register.call_args.kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 1
|
|
||||||
|
|
||||||
# Disconnect the slot
|
|
||||||
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic0")
|
|
||||||
|
|
||||||
# Test to call the slot once (slot should be not connected anymore), count remains 1
|
|
||||||
consumer.register.call_args.kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 1
|
|
||||||
|
|
||||||
|
|
||||||
def test_disconnect_many_slots_one_topic(bec_dispatcher, consumer):
|
|
||||||
slot1, slot2, slot3 = Mock(), Mock(), Mock()
|
|
||||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
|
|
||||||
bec_dispatcher.connect_slot(slot=slot2, topics="topic0")
|
|
||||||
|
|
||||||
# disconnect using a different slot
|
|
||||||
bec_dispatcher.disconnect_slot(slot3, topics="topic0")
|
|
||||||
consumer.register.call_args.kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 1
|
|
||||||
assert slot2.call_count == 1
|
|
||||||
|
|
||||||
# disconnect using a different topics
|
|
||||||
bec_dispatcher.disconnect_slot(slot1, topics="topic1")
|
|
||||||
consumer.register.call_args.kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 2
|
|
||||||
assert slot2.call_count == 2
|
|
||||||
|
|
||||||
# disconnect using the right slot and topics
|
|
||||||
bec_dispatcher.disconnect_slot(slot1, topics="topic0")
|
|
||||||
consumer.register.call_args.kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 2
|
|
||||||
assert slot2.call_count == 3
|
|
||||||
|
|
||||||
|
|
||||||
def test_disconnect_one_slot_many_topics(bec_dispatcher, consumer):
|
|
||||||
slot1, slot2 = Mock(), Mock()
|
|
||||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
|
|
||||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic1")
|
|
||||||
|
|
||||||
# disconnect using a different slot
|
|
||||||
bec_dispatcher.disconnect_slot(slot=slot2, topics="topic0")
|
|
||||||
consumer.register.call_args_list[0].kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 1
|
|
||||||
consumer.register.call_args_list[1].kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 2
|
|
||||||
|
|
||||||
# disconnect using a different topics
|
|
||||||
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic3")
|
|
||||||
consumer.register.call_args_list[0].kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 3
|
|
||||||
consumer.register.call_args_list[1].kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 4
|
|
||||||
|
|
||||||
# disconnect using the right slot and topics
|
|
||||||
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic0")
|
|
||||||
# Calling disconnected topic0 should not call slot1
|
|
||||||
consumer.register.call_args_list[0].kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 4
|
|
||||||
# Calling topic1 should still call slot1
|
|
||||||
consumer.register.call_args_list[1].kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 5
|
|
||||||
|
|
||||||
# disconnect remaining topic1 from slot1, calling any topic should not increase count
|
|
||||||
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic1")
|
|
||||||
consumer.register.call_args_list[0].kwargs["cb"](msg)
|
|
||||||
consumer.register.call_args_list[1].kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 5
|
|
||||||
|
|
||||||
|
|
||||||
def test_disconnect_all(bec_dispatcher, consumer):
|
|
||||||
# Mock slots to connect
|
|
||||||
slot1, slot2, slot3 = Mock(), Mock(), Mock()
|
|
||||||
|
|
||||||
# Connect slots to different topics
|
|
||||||
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
|
|
||||||
bec_dispatcher.connect_slot(slot=slot2, topics="topic1")
|
|
||||||
bec_dispatcher.connect_slot(slot=slot3, topics="topic2")
|
|
||||||
|
|
||||||
# Call disconnect_all method
|
|
||||||
bec_dispatcher.disconnect_all()
|
|
||||||
|
|
||||||
# Simulate messages and verify that none of the slots are called
|
|
||||||
consumer.register.call_args_list[0].kwargs["cb"](msg)
|
|
||||||
consumer.register.call_args_list[1].kwargs["cb"](msg)
|
|
||||||
consumer.register.call_args_list[2].kwargs["cb"](msg)
|
|
||||||
|
|
||||||
# Ensure that the slots have not been called
|
|
||||||
assert slot1.call_count == 0
|
|
||||||
assert slot2.call_count == 0
|
|
||||||
assert slot3.call_count == 0
|
|
||||||
|
|
||||||
# Also, check that the consumer for each topic is shutdown
|
|
||||||
assert "topic0" not in bec_dispatcher._connections
|
|
||||||
assert "topic1" not in bec_dispatcher._connections
|
|
||||||
assert "topic2" not in bec_dispatcher._connections
|
|
||||||
|
|
||||||
|
|
||||||
def test_connect_one_slot_multiple_topics_single_callback(bec_dispatcher, consumer):
|
|
||||||
slot1 = Mock()
|
|
||||||
|
|
||||||
# Connect the slot to multiple topics using a single callback
|
|
||||||
topics = ["topic1", "topic2"]
|
|
||||||
bec_dispatcher.connect_slot(slot=slot1, topics=topics, single_callback_for_all_topics=True)
|
|
||||||
|
|
||||||
# Verify the initial state
|
|
||||||
assert len(bec_dispatcher._connections) == 1 # One connection for all topics
|
|
||||||
assert len(bec_dispatcher._connections[tuple(sorted(topics))].slots) == 1 # One slot connected
|
|
||||||
|
|
||||||
# Simulate messages being published on each topic
|
|
||||||
for topic in topics:
|
|
||||||
msg_with_topic = MessageObject(
|
|
||||||
topic=topic, value=ScanMessage(point_id=0, scan_id="scan_id", data={})
|
|
||||||
)
|
)
|
||||||
consumer.register.call_args.kwargs["cb"](msg_with_topic)
|
],
|
||||||
|
)
|
||||||
|
def test_dispatcher_disconnect_all(bec_dispatcher_w_connector, qtbot):
|
||||||
|
bec_dispatcher = bec_dispatcher_w_connector
|
||||||
|
cb1 = mock.Mock(spec=[])
|
||||||
|
cb2 = mock.Mock(spec=[])
|
||||||
|
|
||||||
# Verify that the slot is called once for each topic
|
bec_dispatcher.connect_slot(cb1, "topic1")
|
||||||
assert slot1.call_count == len(topics)
|
bec_dispatcher.connect_slot(cb1, "topic2")
|
||||||
|
bec_dispatcher.connect_slot(cb2, "topic2")
|
||||||
|
bec_dispatcher.connect_slot(cb2, "topic3")
|
||||||
|
assert len(bec_dispatcher.client.connector._topics_cb) == 3
|
||||||
|
|
||||||
# Verify that a single consumer is created for all topics
|
|
||||||
consumer.register.assert_called_once()
|
|
||||||
|
|
||||||
|
|
||||||
def test_disconnect_all_with_single_callback_for_multiple_topics(bec_dispatcher, consumer):
|
|
||||||
slot1 = Mock()
|
|
||||||
|
|
||||||
# Connect the slot to multiple topics using a single callback
|
|
||||||
topics = ["topic1", "topic2"]
|
|
||||||
bec_dispatcher.connect_slot(slot=slot1, topics=topics, single_callback_for_all_topics=True)
|
|
||||||
|
|
||||||
# Verify the initial state
|
|
||||||
assert len(bec_dispatcher._connections) == 1 # One connection for all topics
|
|
||||||
assert len(bec_dispatcher._connections[tuple(sorted(topics))].slots) == 1 # One slot connected
|
|
||||||
|
|
||||||
# Call disconnect_all method
|
|
||||||
bec_dispatcher.disconnect_all()
|
bec_dispatcher.disconnect_all()
|
||||||
|
|
||||||
# Verify that the slot is disconnected
|
assert len(bec_dispatcher.client.connector._topics_cb) == 0
|
||||||
assert len(bec_dispatcher._connections) == 0 # All connections are removed
|
|
||||||
assert slot1.call_count == 0 # Slot has not been called
|
|
||||||
|
|
||||||
# Simulate messages and verify that the slot is not called
|
|
||||||
consumer.register.call_args.kwargs["cb"](msg)
|
|
||||||
assert slot1.call_count == 0 # Slot has not been called
|
|
||||||
|
Reference in New Issue
Block a user