diff --git a/bec_lib/bec_lib/redis_connector.py b/bec_lib/bec_lib/redis_connector.py index 7863c4ba..fdd34f7c 100644 --- a/bec_lib/bec_lib/redis_connector.py +++ b/bec_lib/bec_lib/redis_connector.py @@ -540,8 +540,15 @@ class RedisConnector(ConnectorBase): else: with self._stream_topics_subscription_lock: for topic in topics: + try: + stream_info = self._redis_conn.xinfo_stream(topic) + except redis.exceptions.ResponseError: + # no such key + last_id = "0-0" + else: + last_id = stream_info["last-entry"][0].decode() new_subscription = StreamSubscriptionInfo( - id="0-0" if from_start else "$", + id="0-0" if from_start else last_id, topic=topic, newest_only=newest_only, from_start=from_start, diff --git a/bec_lib/tests/test_redis_connector_fakeredis.py b/bec_lib/tests/test_redis_connector_fakeredis.py index ab2e2ec5..0426d61a 100644 --- a/bec_lib/tests/test_redis_connector_fakeredis.py +++ b/bec_lib/tests/test_redis_connector_fakeredis.py @@ -445,7 +445,7 @@ def test_redis_connector_register_stream_list(connected_connector, endpoint): assert len(connector._stream_topics_subscription) == 0 -@pytest.mark.timeout(5) +@pytest.mark.timeout(10) def test_redis_connector_register_stream_from_start(connected_connector): connector = connected_connector cb_mock1 = mock.Mock(spec=[]) # spec is here to remove all attributes @@ -468,6 +468,11 @@ def test_redis_connector_register_stream_from_start(connected_connector): cb_mock1.assert_has_calls( [mock.call({"data": 1}, a=3), mock.call({"data": 2}, a=3), mock.call({"data": 3}, a=3)] ) + cb_mock1.reset_mock() + connector.register(TestStreamEndpoint, cb=cb_mock1, start_thread=False, a=4) + with pytest.raises(TimeoutError): + connector.poll_messages(timeout=1) + cb_mock1.assert_not_called() @pytest.mark.timeout(5)