tests(redis_connector): fixed test for messages that are not BECMessages

This commit is contained in:
wakonig_k 2024-12-13 11:16:23 +01:00
parent 64fcbe37b9
commit d42f26dad8

View File

@ -236,17 +236,14 @@ def test_redis_connector_lrange(connector, topic, start, end, use_pipe):
) )
def test_redis_connector_set_and_publish(connector, topic, msg, pipe, expire): def test_redis_connector_set_and_publish(connector, topic, msg, pipe, expire):
if not isinstance(msg, BECMessage): if not isinstance(msg, BECMessage):
with pytest.raises(TypeError): msg_sent = msg
connector.set_and_publish(topic, msg, pipe, expire)
else: else:
msg_sent = MsgpackSerialization.dumps(msg)
connector.set_and_publish(topic, msg, pipe, expire) connector.set_and_publish(topic, msg, pipe, expire)
connector._redis_conn.pipeline().publish.assert_called_once_with( connector._redis_conn.pipeline().publish.assert_called_once_with(topic, msg_sent)
topic, MsgpackSerialization.dumps(msg) connector._redis_conn.pipeline().set.assert_called_once_with(topic, msg_sent, ex=expire)
)
connector._redis_conn.pipeline().set.assert_called_once_with(
topic, MsgpackSerialization.dumps(msg), ex=expire
)
if not pipe: if not pipe:
connector._redis_conn.pipeline().execute.assert_called_once() connector._redis_conn.pipeline().execute.assert_called_once()