fix: fixed xadd for pipelines

This commit is contained in:
wakonig_k 2023-08-31 21:54:08 +02:00
parent 57c989cfe2
commit d19fce7d21
2 changed files with 29 additions and 3 deletions

View File

@ -316,16 +316,35 @@ class RedisProducer(ProducerConnector):
@catch_connection_error @catch_connection_error
def xadd(self, topic: str, msg: dict, max_size=None, pipe=None, expire: int = None): def xadd(self, topic: str, msg: dict, max_size=None, pipe=None, expire: int = None):
"""add to stream""" """
add to stream
Args:
topic (str): redis topic
msg (dict): message to add
max_size (int, optional): max size of stream. Defaults to None.
pipe (Pipeline, optional): redis pipe. Defaults to None.
expire (int, optional): expire time. Defaults to None.
Examples:
>>> redis.xadd("test", {"test": "test"})
>>> redis.xadd("test", {"test": "test"}, max_size=10)
"""
topic = trim_topic(topic, ":val") topic = trim_topic(topic, ":val")
client = pipe if pipe is not None else self.pipeline() if pipe:
client = pipe
elif expire:
client = self.pipeline()
else:
client = self.r
if max_size: if max_size:
client.xadd(f"{topic}:val", msg, maxlen=max_size) client.xadd(f"{topic}:val", msg, maxlen=max_size)
else: else:
client.xadd(f"{topic}:val", msg) client.xadd(f"{topic}:val", msg)
if expire: if expire:
client.expire(f"{topic}:val", expire) client.expire(f"{topic}:val", expire)
if not pipe: if not pipe or expire:
client.execute() client.execute()
@catch_connection_error @catch_connection_error

View File

@ -453,6 +453,13 @@ def test_redis_connector_xadd_with_maxlen(producer):
producer.r.xadd.assert_called_once_with("topic:val", {"key": "value"}, maxlen=100) producer.r.xadd.assert_called_once_with("topic:val", {"key": "value"}, maxlen=100)
def test_redis_connector_xadd_with_expire(producer):
producer.xadd("topic", {"key": "value"}, expire=100)
producer.r.pipeline().xadd.assert_called_once_with("topic:val", {"key": "value"})
producer.r.pipeline().expire.assert_called_once_with("topic:val", 100)
producer.r.pipeline().execute.assert_called_once()
def test_redis_connector_xread(producer): def test_redis_connector_xread(producer):
producer.xread("topic", "id") producer.xread("topic", "id")
producer.r.xread.assert_called_once_with({"topic:val": "id"}, count=None, block=None) producer.r.xread.assert_called_once_with({"topic:val": "id"}, count=None, block=None)