refactor(redis_connector): update authenticate method to use keyword-only arguments

This commit is contained in:
wakonig_k 2025-03-08 14:12:59 +01:00
parent 814ce4be61
commit 6500883017
3 changed files with 14 additions and 12 deletions

View File

@ -57,13 +57,13 @@ class BECAccess:
else:
token = self._local_login(selected_account)
self.connector.authenticate(token, selected_account)
self.connector.authenticate(username=selected_account, password=token)
def login_with_token(self, username: str, token: str | None):
def login_with_token(self, *, username: str, token: str | None):
"""
Login with a username and token.
"""
self.connector.authenticate(token, username)
self.connector.authenticate(username=username, password=token)
def _ask_user_for_account(self, console: Console) -> str:
"""
@ -239,14 +239,14 @@ class BECAccess:
return False
def _user_service_login(self) -> None:
self.connector.authenticate("bec", "bec")
self.connector.authenticate(username="bec", password="bec")
self._info: messages.LoginInfoMessage | None = self.connector.get(
MessageEndpoints.login_info()
)
self._atlas_login = self._info.atlas_login
# pylint: disable=protected-access
if not self._atlas_login:
self.login_with_token("user", None)
self.login_with_token(username="user", token=None)
else:
self.login()

View File

@ -244,9 +244,11 @@ class RedisConnector:
self._generator_executor = ThreadPoolExecutor()
def authenticate(self, password: str | None = None, username: str = "default"):
def authenticate(self, *, username: str = "default", password: str | None = None):
"""
Authenticate to the redis server
Authenticate to the redis server.
Please note that the arguments are keyword-only. This is to avoid confusion as the
underlying redis library accepts the password as the first argument.
Args:
password (str): password

View File

@ -11,16 +11,16 @@ class BECAccessDemo: # pragma: no cover
self.connector = connector
else:
self.connector = RedisConnector("localhost:6379")
self.connector.authenticate(*self._find_admin_account())
self.connector.authenticate(**self._find_admin_account())
self.username = "user"
self.admin_username = "admin"
self.deployment_id = "test_deployment"
def _find_admin_account(self):
def _find_admin_account(self) -> dict[str, str]:
for user, token in [("default", None), ("bec", "bec"), ("admin", "admin")]:
try:
self.connector.authenticate(token, user)
return token, user
self.connector.authenticate(username=user, password=token)
return {"username": user, "password": token}
except Exception:
pass
raise RuntimeError("No admin account found. Please restart the Redis server.")
@ -100,7 +100,7 @@ class BECAccessDemo: # pragma: no cover
def reset(self):
try:
self.connector.authenticate("admin", "admin")
self.connector.authenticate(username="admin", password="admin")
# pylint: disable=broad-except
except Exception:
pass