mirror of
https://github.com/ivan-usov-org/bec.git
synced 2025-04-21 18:20:01 +02:00
fix: race condition when reading new value from stream
This commit is contained in:
parent
d7db6befe9
commit
87cc71aa91
@ -540,8 +540,15 @@ class RedisConnector(ConnectorBase):
|
|||||||
else:
|
else:
|
||||||
with self._stream_topics_subscription_lock:
|
with self._stream_topics_subscription_lock:
|
||||||
for topic in topics:
|
for topic in topics:
|
||||||
|
try:
|
||||||
|
stream_info = self._redis_conn.xinfo_stream(topic)
|
||||||
|
except redis.exceptions.ResponseError:
|
||||||
|
# no such key
|
||||||
|
last_id = "0-0"
|
||||||
|
else:
|
||||||
|
last_id = stream_info["last-entry"][0].decode()
|
||||||
new_subscription = StreamSubscriptionInfo(
|
new_subscription = StreamSubscriptionInfo(
|
||||||
id="0-0" if from_start else "$",
|
id="0-0" if from_start else last_id,
|
||||||
topic=topic,
|
topic=topic,
|
||||||
newest_only=newest_only,
|
newest_only=newest_only,
|
||||||
from_start=from_start,
|
from_start=from_start,
|
||||||
|
@ -445,7 +445,7 @@ def test_redis_connector_register_stream_list(connected_connector, endpoint):
|
|||||||
assert len(connector._stream_topics_subscription) == 0
|
assert len(connector._stream_topics_subscription) == 0
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.timeout(5)
|
@pytest.mark.timeout(10)
|
||||||
def test_redis_connector_register_stream_from_start(connected_connector):
|
def test_redis_connector_register_stream_from_start(connected_connector):
|
||||||
connector = connected_connector
|
connector = connected_connector
|
||||||
cb_mock1 = mock.Mock(spec=[]) # spec is here to remove all attributes
|
cb_mock1 = mock.Mock(spec=[]) # spec is here to remove all attributes
|
||||||
@ -468,6 +468,11 @@ def test_redis_connector_register_stream_from_start(connected_connector):
|
|||||||
cb_mock1.assert_has_calls(
|
cb_mock1.assert_has_calls(
|
||||||
[mock.call({"data": 1}, a=3), mock.call({"data": 2}, a=3), mock.call({"data": 3}, a=3)]
|
[mock.call({"data": 1}, a=3), mock.call({"data": 2}, a=3), mock.call({"data": 3}, a=3)]
|
||||||
)
|
)
|
||||||
|
cb_mock1.reset_mock()
|
||||||
|
connector.register(TestStreamEndpoint, cb=cb_mock1, start_thread=False, a=4)
|
||||||
|
with pytest.raises(TimeoutError):
|
||||||
|
connector.poll_messages(timeout=1)
|
||||||
|
cb_mock1.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.timeout(5)
|
@pytest.mark.timeout(5)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user