From 301bb916da9716f5d2d515279b5765d3b3722112 Mon Sep 17 00:00:00 2001 From: Mathias Guijarro Date: Mon, 8 Apr 2024 14:37:34 +0200 Subject: [PATCH] test(utils/bec_dispatcher): tests fixed --- tests/test_bec_dispatcher.py | 265 ++++++----------------------------- 1 file changed, 42 insertions(+), 223 deletions(-) diff --git a/tests/test_bec_dispatcher.py b/tests/test_bec_dispatcher.py index e4694487..5528e2cc 100644 --- a/tests/test_bec_dispatcher.py +++ b/tests/test_bec_dispatcher.py @@ -1,239 +1,58 @@ # pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring -from unittest.mock import Mock +import time +from unittest import mock import pytest +import redis from bec_lib.connector import MessageObject from bec_lib.messages import ScanMessage - -msg = MessageObject(topic="", value=ScanMessage(point_id=0, scan_id="scan_id", data={})) +from bec_lib.redis_connector import RedisConnector +from bec_lib.serialization import MsgpackSerialization +from bec_widgets.utils.bec_dispatcher import QtRedisConnector -@pytest.fixture(name="consumer") -def _consumer(bec_dispatcher): - bec_dispatcher.client.connector = Mock() - yield bec_dispatcher.client.connector +@pytest.fixture +def bec_dispatcher_w_connector(bec_dispatcher, topics_msg_list): + def pubsub_msg_generator(): + for topic, msg in topics_msg_list: + yield {"channel": topic.encode(), "pattern": None, "data": msg} + while True: + time.sleep(0.2) + yield StopIteration + + with mock.patch("redis.Redis"): + pubsub = redis.Redis().pubsub() + messages = pubsub_msg_generator() + pubsub.get_message.side_effect = lambda timeout: next(messages) + connector = QtRedisConnector("localhost:1") + bec_dispatcher.client.connector = connector + yield bec_dispatcher -@pytest.mark.filterwarnings("ignore:Failed to connect to redis.") -def test_connect_one_slot(bec_dispatcher, consumer): - slot1 = Mock() - bec_dispatcher.connect_slot(slot=slot1, topics="topic0") - consumer.register.assert_called_once() - # trigger consumer callback as if a message was published - consumer.register.call_args.kwargs["cb"](msg) - slot1.assert_called_once() - consumer.register.call_args.kwargs["cb"](msg) - assert slot1.call_count == 2 +dummy_msg = MsgpackSerialization.dumps(ScanMessage(point_id=0, scan_id=0, data={})) -def test_connect_identical(bec_dispatcher, consumer): - slot1 = Mock() - bec_dispatcher.connect_slot(slot=slot1, topics="topic0") - bec_dispatcher.connect_slot(slot=slot1, topics="topic0") - consumer.register.assert_called_once() - - consumer.register.call_args.kwargs["cb"](msg) - slot1.assert_called_once() - - -def test_connect_many_slots_one_topic(bec_dispatcher, consumer): - slot1, slot2 = Mock(), Mock() - bec_dispatcher.connect_slot(slot=slot1, topics="topic0") - consumer.register.assert_called_once() - bec_dispatcher.connect_slot(slot=slot2, topics="topic0") - consumer.register.assert_called_once() - # trigger consumer callback as if a message was published - consumer.register.call_args.kwargs["cb"](msg) - slot1.assert_called_once() - slot2.assert_called_once() - consumer.register.call_args.kwargs["cb"](msg) - assert slot1.call_count == 2 - assert slot2.call_count == 2 - - -def test_connect_one_slot_many_topics(bec_dispatcher, consumer): - slot1 = Mock() - bec_dispatcher.connect_slot(slot=slot1, topics="topic0") - assert consumer.register.call_count == 1 - bec_dispatcher.connect_slot(slot=slot1, topics="topic1") - assert consumer.register.call_count == 2 - # trigger consumer callback as if a message was published - consumer.register.call_args_list[0].kwargs["cb"](msg) - slot1.assert_called_once() - consumer.register.call_args_list[1].kwargs["cb"](msg) - assert slot1.call_count == 2 - - -def test_disconnect_one_slot_one_topic(bec_dispatcher, consumer): - slot1, slot2 = Mock(), Mock() - bec_dispatcher.connect_slot(slot=slot1, topics="topic0") - - # disconnect using a different topic - bec_dispatcher.disconnect_slot(slot=slot1, topics="topic1") - consumer.register.call_args.kwargs["cb"](msg) - assert slot1.call_count == 1 - - # disconnect using a different slot - bec_dispatcher.disconnect_slot(slot=slot2, topics="topic0") - consumer.register.call_args.kwargs["cb"](msg) - assert slot1.call_count == 2 - - # disconnect using the right slot and topics - bec_dispatcher.disconnect_slot(slot=slot1, topics="topic0") - # reset count to for slot - slot1.reset_mock() - consumer.register.call_args.kwargs["cb"](msg) - assert slot1.call_count == 0 - - -def test_disconnect_identical(bec_dispatcher, consumer): - slot1 = Mock() - # Try to connect slot twice - bec_dispatcher.connect_slot(slot=slot1, topics="topic0") - bec_dispatcher.connect_slot(slot=slot1, topics="topic0") - - # Test to call the slot once (slot should be not connected twice) - consumer.register.call_args.kwargs["cb"](msg) - assert slot1.call_count == 1 - - # Disconnect the slot - bec_dispatcher.disconnect_slot(slot=slot1, topics="topic0") - - # Test to call the slot once (slot should be not connected anymore), count remains 1 - consumer.register.call_args.kwargs["cb"](msg) - assert slot1.call_count == 1 - - -def test_disconnect_many_slots_one_topic(bec_dispatcher, consumer): - slot1, slot2, slot3 = Mock(), Mock(), Mock() - bec_dispatcher.connect_slot(slot=slot1, topics="topic0") - bec_dispatcher.connect_slot(slot=slot2, topics="topic0") - - # disconnect using a different slot - bec_dispatcher.disconnect_slot(slot3, topics="topic0") - consumer.register.call_args.kwargs["cb"](msg) - assert slot1.call_count == 1 - assert slot2.call_count == 1 - - # disconnect using a different topics - bec_dispatcher.disconnect_slot(slot1, topics="topic1") - consumer.register.call_args.kwargs["cb"](msg) - assert slot1.call_count == 2 - assert slot2.call_count == 2 - - # disconnect using the right slot and topics - bec_dispatcher.disconnect_slot(slot1, topics="topic0") - consumer.register.call_args.kwargs["cb"](msg) - assert slot1.call_count == 2 - assert slot2.call_count == 3 - - -def test_disconnect_one_slot_many_topics(bec_dispatcher, consumer): - slot1, slot2 = Mock(), Mock() - bec_dispatcher.connect_slot(slot=slot1, topics="topic0") - bec_dispatcher.connect_slot(slot=slot1, topics="topic1") - - # disconnect using a different slot - bec_dispatcher.disconnect_slot(slot=slot2, topics="topic0") - consumer.register.call_args_list[0].kwargs["cb"](msg) - assert slot1.call_count == 1 - consumer.register.call_args_list[1].kwargs["cb"](msg) - assert slot1.call_count == 2 - - # disconnect using a different topics - bec_dispatcher.disconnect_slot(slot=slot1, topics="topic3") - consumer.register.call_args_list[0].kwargs["cb"](msg) - assert slot1.call_count == 3 - consumer.register.call_args_list[1].kwargs["cb"](msg) - assert slot1.call_count == 4 - - # disconnect using the right slot and topics - bec_dispatcher.disconnect_slot(slot=slot1, topics="topic0") - # Calling disconnected topic0 should not call slot1 - consumer.register.call_args_list[0].kwargs["cb"](msg) - assert slot1.call_count == 4 - # Calling topic1 should still call slot1 - consumer.register.call_args_list[1].kwargs["cb"](msg) - assert slot1.call_count == 5 - - # disconnect remaining topic1 from slot1, calling any topic should not increase count - bec_dispatcher.disconnect_slot(slot=slot1, topics="topic1") - consumer.register.call_args_list[0].kwargs["cb"](msg) - consumer.register.call_args_list[1].kwargs["cb"](msg) - assert slot1.call_count == 5 - - -def test_disconnect_all(bec_dispatcher, consumer): - # Mock slots to connect - slot1, slot2, slot3 = Mock(), Mock(), Mock() - - # Connect slots to different topics - bec_dispatcher.connect_slot(slot=slot1, topics="topic0") - bec_dispatcher.connect_slot(slot=slot2, topics="topic1") - bec_dispatcher.connect_slot(slot=slot3, topics="topic2") - - # Call disconnect_all method - bec_dispatcher.disconnect_all() - - # Simulate messages and verify that none of the slots are called - consumer.register.call_args_list[0].kwargs["cb"](msg) - consumer.register.call_args_list[1].kwargs["cb"](msg) - consumer.register.call_args_list[2].kwargs["cb"](msg) - - # Ensure that the slots have not been called - assert slot1.call_count == 0 - assert slot2.call_count == 0 - assert slot3.call_count == 0 - - # Also, check that the consumer for each topic is shutdown - assert "topic0" not in bec_dispatcher._connections - assert "topic1" not in bec_dispatcher._connections - assert "topic2" not in bec_dispatcher._connections - - -def test_connect_one_slot_multiple_topics_single_callback(bec_dispatcher, consumer): - slot1 = Mock() - - # Connect the slot to multiple topics using a single callback - topics = ["topic1", "topic2"] - bec_dispatcher.connect_slot(slot=slot1, topics=topics, single_callback_for_all_topics=True) - - # Verify the initial state - assert len(bec_dispatcher._connections) == 1 # One connection for all topics - assert len(bec_dispatcher._connections[tuple(sorted(topics))].slots) == 1 # One slot connected - - # Simulate messages being published on each topic - for topic in topics: - msg_with_topic = MessageObject( - topic=topic, value=ScanMessage(point_id=0, scan_id="scan_id", data={}) +@pytest.mark.parametrize( + "topics_msg_list", + [ + ( + ("topic1", dummy_msg), + ("topic2", dummy_msg), + ("topic3", dummy_msg), ) - consumer.register.call_args.kwargs["cb"](msg_with_topic) + ], +) +def test_dispatcher_disconnect_all(bec_dispatcher_w_connector, qtbot): + bec_dispatcher = bec_dispatcher_w_connector + cb1 = mock.Mock(spec=[]) + cb2 = mock.Mock(spec=[]) - # Verify that the slot is called once for each topic - assert slot1.call_count == len(topics) + bec_dispatcher.connect_slot(cb1, "topic1") + bec_dispatcher.connect_slot(cb1, "topic2") + bec_dispatcher.connect_slot(cb2, "topic2") + bec_dispatcher.connect_slot(cb2, "topic3") + assert len(bec_dispatcher.client.connector._topics_cb) == 3 - # Verify that a single consumer is created for all topics - consumer.register.assert_called_once() - - -def test_disconnect_all_with_single_callback_for_multiple_topics(bec_dispatcher, consumer): - slot1 = Mock() - - # Connect the slot to multiple topics using a single callback - topics = ["topic1", "topic2"] - bec_dispatcher.connect_slot(slot=slot1, topics=topics, single_callback_for_all_topics=True) - - # Verify the initial state - assert len(bec_dispatcher._connections) == 1 # One connection for all topics - assert len(bec_dispatcher._connections[tuple(sorted(topics))].slots) == 1 # One slot connected - - # Call disconnect_all method bec_dispatcher.disconnect_all() - # Verify that the slot is disconnected - assert len(bec_dispatcher._connections) == 0 # All connections are removed - assert slot1.call_count == 0 # Slot has not been called - - # Simulate messages and verify that the slot is not called - consumer.register.call_args.kwargs["cb"](msg) - assert slot1.call_count == 0 # Slot has not been called + assert len(bec_dispatcher.client.connector._topics_cb) == 0