mirror of
https://github.com/ivan-usov-org/bec.git
synced 2025-04-21 10:10:02 +02:00
test: added redis connector tests
This commit is contained in:
parent
dc954e2b3c
commit
2f30c64cd7
@ -14,6 +14,7 @@ from bec_lib.core.redis_connector import (
|
||||
RedisConsumerMixin,
|
||||
RedisConsumerThreaded,
|
||||
RedisProducer,
|
||||
RedisStreamConsumerThreaded,
|
||||
)
|
||||
|
||||
|
||||
@ -479,3 +480,122 @@ def test_redis_connector_xread_from_new_topic(producer):
|
||||
)
|
||||
producer.xread("topic", from_start=False)
|
||||
producer.r.xread.assert_called_once_with({"topic:val": "0-0"}, count=None, block=None)
|
||||
|
||||
|
||||
def test_redis_consumer_threaded_no_cb_without_messages(consumer_threaded):
|
||||
with mock.patch.object(consumer_threaded.pubsub, "get_message", return_value=None):
|
||||
consumer_threaded.cb = mock.MagicMock()
|
||||
consumer_threaded.poll_messages()
|
||||
consumer_threaded.cb.assert_not_called()
|
||||
|
||||
|
||||
def test_redis_consumer_threaded_cb_called_with_messages(consumer_threaded):
|
||||
messages = {"channel": b"topic:sub", "data": '{"key": "value"}'}
|
||||
|
||||
with mock.patch.object(consumer_threaded.pubsub, "get_message", return_value=messages):
|
||||
consumer_threaded.cb = mock.MagicMock()
|
||||
consumer_threaded.poll_messages()
|
||||
msg = MessageObject(topic=b"topic:sub", value='{"key": "value"}')
|
||||
consumer_threaded.cb.assert_called_once_with(msg)
|
||||
|
||||
|
||||
def test_redis_consumer_threaded_shutdown(consumer_threaded):
|
||||
consumer_threaded.shutdown()
|
||||
consumer_threaded.pubsub.close.assert_called_once()
|
||||
|
||||
|
||||
def test_redis_stream_consumer_threaded_get_newest_message():
|
||||
consumer = RedisStreamConsumerThreaded(
|
||||
"localhost", "1", topics="topic", cb=mock.MagicMock(), redis_cls=mock.MagicMock()
|
||||
)
|
||||
consumer.r.xrevrange.return_value = [(b"1691610882756-0", {b"data": b"msg"})]
|
||||
msgs = []
|
||||
consumer.get_newest_message(msgs)
|
||||
assert "topic:stream" in consumer.stream_keys
|
||||
assert consumer.stream_keys["topic:stream"] == b"1691610882756-0"
|
||||
|
||||
|
||||
def test_redis_stream_consumer_threaded_get_newest_message_no_msg():
|
||||
consumer = RedisStreamConsumerThreaded(
|
||||
"localhost", "1", topics="topic", cb=mock.MagicMock(), redis_cls=mock.MagicMock()
|
||||
)
|
||||
consumer.r.xrevrange.return_value = []
|
||||
msgs = []
|
||||
consumer.get_newest_message(msgs)
|
||||
assert "topic:stream" in consumer.stream_keys
|
||||
assert consumer.stream_keys["topic:stream"] == "0-0"
|
||||
|
||||
|
||||
def test_redis_stream_consumer_threaded_get_id():
|
||||
consumer = RedisStreamConsumerThreaded(
|
||||
"localhost", "1", topics="topic", cb=mock.MagicMock(), redis_cls=mock.MagicMock()
|
||||
)
|
||||
consumer.stream_keys["topic:stream"] = b"1691610882756-0"
|
||||
assert consumer.get_id("topic:stream") == b"1691610882756-0"
|
||||
assert consumer.get_id("doesnt_exist") == "0-0"
|
||||
|
||||
|
||||
def test_redis_stream_consumer_threaded_poll_messages():
|
||||
consumer = RedisStreamConsumerThreaded(
|
||||
"localhost", "1", topics="topic", cb=mock.MagicMock(), redis_cls=mock.MagicMock()
|
||||
)
|
||||
with mock.patch.object(
|
||||
consumer, "get_newest_message", return_value=None
|
||||
) as mock_get_newest_message:
|
||||
consumer.poll_messages()
|
||||
mock_get_newest_message.assert_called_once()
|
||||
consumer.r.xread.assert_not_called()
|
||||
|
||||
|
||||
def test_redis_stream_consumer_threaded_poll_messages_newest_only():
|
||||
consumer = RedisStreamConsumerThreaded(
|
||||
"localhost",
|
||||
"1",
|
||||
topics="topic",
|
||||
cb=mock.MagicMock(),
|
||||
redis_cls=mock.MagicMock(),
|
||||
newest_only=True,
|
||||
)
|
||||
|
||||
consumer.r.xrevrange.return_value = [(b"1691610882756-0", {b"data": b"msg"})]
|
||||
consumer.poll_messages()
|
||||
consumer.r.xread.assert_not_called()
|
||||
consumer.cb.assert_called_once_with(MessageObject(topic="topic:stream", value=b"msg"))
|
||||
|
||||
|
||||
def test_redis_stream_consumer_threaded_poll_messages_read():
|
||||
consumer = RedisStreamConsumerThreaded(
|
||||
"localhost",
|
||||
"1",
|
||||
topics="topic",
|
||||
cb=mock.MagicMock(),
|
||||
redis_cls=mock.MagicMock(),
|
||||
)
|
||||
consumer.stream_keys["topic:stream"] = "0-0"
|
||||
|
||||
msg = [[b"topic:stream", [(b"1691610714612-0", {b"data": b"msg"})]]]
|
||||
|
||||
consumer.r.xread.return_value = msg
|
||||
consumer.poll_messages()
|
||||
consumer.r.xread.assert_called_once_with({"topic:stream": "0-0"}, count=1)
|
||||
consumer.cb.assert_called_once_with(MessageObject(topic="topic:stream", value=b"msg"))
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"topics,expected",
|
||||
[
|
||||
("topic", ["topic:stream"]),
|
||||
(["topic"], ["topic:stream"]),
|
||||
(["topic:stream"], ["topic:stream"]),
|
||||
(["topic:stream", "topic2:stream"], ["topic:stream", "topic2:stream"]),
|
||||
],
|
||||
)
|
||||
def test_redis_stream_consumer_threaded_init_topics(topics, expected):
|
||||
consumer = RedisStreamConsumerThreaded(
|
||||
"localhost",
|
||||
"1",
|
||||
topics=topics,
|
||||
cb=mock.MagicMock(),
|
||||
redis_cls=mock.MagicMock(),
|
||||
)
|
||||
assert consumer.topics == expected
|
||||
|
Loading…
x
Reference in New Issue
Block a user