From 2f30c64cd76f15b81b9f128d9a5562f35be85a8a Mon Sep 17 00:00:00 2001 From: Klaus Wakonig Date: Thu, 10 Aug 2023 21:32:52 +0200 Subject: [PATCH] test: added redis connector tests --- bec_lib/tests/test_redis_connector.py | 120 ++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/bec_lib/tests/test_redis_connector.py b/bec_lib/tests/test_redis_connector.py index 3c6e881e..9bbb4af9 100644 --- a/bec_lib/tests/test_redis_connector.py +++ b/bec_lib/tests/test_redis_connector.py @@ -14,6 +14,7 @@ from bec_lib.core.redis_connector import ( RedisConsumerMixin, RedisConsumerThreaded, RedisProducer, + RedisStreamConsumerThreaded, ) @@ -479,3 +480,122 @@ def test_redis_connector_xread_from_new_topic(producer): ) producer.xread("topic", from_start=False) producer.r.xread.assert_called_once_with({"topic:val": "0-0"}, count=None, block=None) + + +def test_redis_consumer_threaded_no_cb_without_messages(consumer_threaded): + with mock.patch.object(consumer_threaded.pubsub, "get_message", return_value=None): + consumer_threaded.cb = mock.MagicMock() + consumer_threaded.poll_messages() + consumer_threaded.cb.assert_not_called() + + +def test_redis_consumer_threaded_cb_called_with_messages(consumer_threaded): + messages = {"channel": b"topic:sub", "data": '{"key": "value"}'} + + with mock.patch.object(consumer_threaded.pubsub, "get_message", return_value=messages): + consumer_threaded.cb = mock.MagicMock() + consumer_threaded.poll_messages() + msg = MessageObject(topic=b"topic:sub", value='{"key": "value"}') + consumer_threaded.cb.assert_called_once_with(msg) + + +def test_redis_consumer_threaded_shutdown(consumer_threaded): + consumer_threaded.shutdown() + consumer_threaded.pubsub.close.assert_called_once() + + +def test_redis_stream_consumer_threaded_get_newest_message(): + consumer = RedisStreamConsumerThreaded( + "localhost", "1", topics="topic", cb=mock.MagicMock(), redis_cls=mock.MagicMock() + ) + consumer.r.xrevrange.return_value = [(b"1691610882756-0", {b"data": b"msg"})] + msgs = [] + consumer.get_newest_message(msgs) + assert "topic:stream" in consumer.stream_keys + assert consumer.stream_keys["topic:stream"] == b"1691610882756-0" + + +def test_redis_stream_consumer_threaded_get_newest_message_no_msg(): + consumer = RedisStreamConsumerThreaded( + "localhost", "1", topics="topic", cb=mock.MagicMock(), redis_cls=mock.MagicMock() + ) + consumer.r.xrevrange.return_value = [] + msgs = [] + consumer.get_newest_message(msgs) + assert "topic:stream" in consumer.stream_keys + assert consumer.stream_keys["topic:stream"] == "0-0" + + +def test_redis_stream_consumer_threaded_get_id(): + consumer = RedisStreamConsumerThreaded( + "localhost", "1", topics="topic", cb=mock.MagicMock(), redis_cls=mock.MagicMock() + ) + consumer.stream_keys["topic:stream"] = b"1691610882756-0" + assert consumer.get_id("topic:stream") == b"1691610882756-0" + assert consumer.get_id("doesnt_exist") == "0-0" + + +def test_redis_stream_consumer_threaded_poll_messages(): + consumer = RedisStreamConsumerThreaded( + "localhost", "1", topics="topic", cb=mock.MagicMock(), redis_cls=mock.MagicMock() + ) + with mock.patch.object( + consumer, "get_newest_message", return_value=None + ) as mock_get_newest_message: + consumer.poll_messages() + mock_get_newest_message.assert_called_once() + consumer.r.xread.assert_not_called() + + +def test_redis_stream_consumer_threaded_poll_messages_newest_only(): + consumer = RedisStreamConsumerThreaded( + "localhost", + "1", + topics="topic", + cb=mock.MagicMock(), + redis_cls=mock.MagicMock(), + newest_only=True, + ) + + consumer.r.xrevrange.return_value = [(b"1691610882756-0", {b"data": b"msg"})] + consumer.poll_messages() + consumer.r.xread.assert_not_called() + consumer.cb.assert_called_once_with(MessageObject(topic="topic:stream", value=b"msg")) + + +def test_redis_stream_consumer_threaded_poll_messages_read(): + consumer = RedisStreamConsumerThreaded( + "localhost", + "1", + topics="topic", + cb=mock.MagicMock(), + redis_cls=mock.MagicMock(), + ) + consumer.stream_keys["topic:stream"] = "0-0" + + msg = [[b"topic:stream", [(b"1691610714612-0", {b"data": b"msg"})]]] + + consumer.r.xread.return_value = msg + consumer.poll_messages() + consumer.r.xread.assert_called_once_with({"topic:stream": "0-0"}, count=1) + consumer.cb.assert_called_once_with(MessageObject(topic="topic:stream", value=b"msg")) + + +@pytest.mark.parametrize( + "topics,expected", + [ + ("topic", ["topic:stream"]), + (["topic"], ["topic:stream"]), + (["topic:stream"], ["topic:stream"]), + (["topic:stream", "topic2:stream"], ["topic:stream", "topic2:stream"]), + ], +) +def test_redis_stream_consumer_threaded_init_topics(topics, expected): + consumer = RedisStreamConsumerThreaded( + "localhost", + "1", + topics=topics, + cb=mock.MagicMock(), + redis_cls=mock.MagicMock(), + ) + assert consumer.topics == expected