feat(bec_access): add login route to retrieve access key and refactor user login handling

This commit is contained in:
2025-03-05 22:45:30 +01:00
parent ff6946d2bc
commit ee46c9c15b
4 changed files with 73 additions and 8 deletions

View File

@ -88,7 +88,9 @@ class AtlasApp:
self.app.include_router(self.deployment_access_router.router, tags=["Deployment Access"])
# BEC Access
self.bec_access_router = BECAccessRouter(prefix=self.prefix, datasources=self.datasources)
self.bec_access_router = BECAccessRouter(
prefix=self.prefix, datasources=self.datasources, app=self
)
self.app.include_router(self.bec_access_router.router, tags=["BEC Access"])
# Session

View File

@ -8,14 +8,22 @@ from bec_atlas.authentication import convert_to_user, get_current_user
from bec_atlas.datasources.mongodb.mongodb import MongoDBDatasource
from bec_atlas.model.model import BECAccessProfile, User
from bec_atlas.router.base_router import BaseRouter
from bec_atlas.router.user_router import UserLoginRequest
if TYPE_CHECKING: # pragma: no cover
from bec_atlas.datasources.datasource_manager import DatasourceManager
from bec_atlas.main import AtlasApp
class BECAccessRouter(BaseRouter):
def __init__(self, prefix="/api/v1", datasources: DatasourceManager | None = None):
def __init__(
self,
prefix="/api/v1",
datasources: DatasourceManager | None = None,
app: AtlasApp | None = None,
):
super().__init__(prefix, datasources)
self.app = app
if not self.datasources:
raise RuntimeError("Datasources not loaded")
@ -28,6 +36,12 @@ class BECAccessRouter(BaseRouter):
methods=["GET"],
description="Retrieve the access key for a specific deployment and user.",
)
self.router.add_api_route(
"/bec_access_login",
self.get_bec_access_login,
methods=["POST"],
description="Login and retrieve the access key for a specific deployment and user.",
)
@convert_to_user
async def get_bec_access(
@ -47,9 +61,41 @@ class BECAccessRouter(BaseRouter):
"""
if not user:
user = current_user.email
out = self._retrieve_access_account(deployment_id, user, current_user=current_user)
return out
async def get_bec_access_login(
self, user_login: UserLoginRequest, deployment_id: str, user: str
) -> dict:
"""
Login and retrieve the access key for a specific deployment and user.
Exactly the same as get_bec_access, but with a login step to avoid the need
for an access token only to retrieve the ACL key.
Args:
deployment_id (str): The deployment id
user (str): The user name to retrieve the access key for
password (str): The password for the user
"""
if not self.app:
raise RuntimeError("App not loaded")
user_info = self.app.user_router._get_user(user_login)
if not user_info:
raise HTTPException(status_code=404, detail="User not found.")
current_user = self.db.get_user_by_email(user_info.email)
if not current_user:
raise HTTPException(status_code=404, detail="User not found.")
out = self._retrieve_access_account(deployment_id, user, current_user=current_user)
return out
def _retrieve_access_account(
self, deployment_id: str, username: str, current_user: User
) -> dict:
out = self.db.find_one(
"bec_access_profiles",
{"deployment_id": deployment_id, "username": user},
{"deployment_id": deployment_id, "username": username},
BECAccessProfile,
user=current_user,
)

View File

@ -57,17 +57,22 @@ class UserRouter(BaseRouter):
return {"access_token": out, "token_type": "bearer"}
async def user_login(self, user_login: UserLoginRequest, response: Response):
user = self._get_user(user_login)
if user is None:
raise HTTPException(status_code=401, detail="User not found or password is incorrect")
token = create_access_token(data={"email": user.email})
response.set_cookie(key="access_token", value=token, httponly=True, secure=self.use_ssl)
token = self._user_login(user_login, response)
return token
async def user_logout(self, response: Response):
response.delete_cookie("access_token")
return {"message": "Logged out"}
def _user_login(self, user_login: UserLoginRequest, response: Response | None) -> str:
user = self._get_user(user_login)
if user is None:
raise HTTPException(status_code=401, detail="User not found or password is incorrect")
token = create_access_token(data={"email": user.email})
if response:
response.set_cookie(key="access_token", value=token, httponly=True, secure=self.use_ssl)
return token
def _get_user(self, user_login: UserLoginRequest) -> UserInfo | None:
user = self._get_functional_account(user_login)
if user is None:

View File

@ -97,3 +97,15 @@ def test_patch_deployment_access(logged_in_client, backend):
"/api/v1/bec_access", params={"deployment_id": deployment_id, "user": user}
)
assert out.status_code == 404
# Test that the access can also be retrieved directly
client, _ = backend
client.post("/api/v1/user/logout")
response = client.post(
"/api/v1/bec_access_login",
json={"username": "admin@bec_atlas.ch", "password": "admin"},
params={"deployment_id": deployment_id, "user": "test1"},
)
assert response.status_code == 200
out = response.json()
assert "token" in out