diff --git a/backend/bec_atlas/main.py b/backend/bec_atlas/main.py index 8bb68f0..ca95a8a 100644 --- a/backend/bec_atlas/main.py +++ b/backend/bec_atlas/main.py @@ -88,7 +88,9 @@ class AtlasApp: self.app.include_router(self.deployment_access_router.router, tags=["Deployment Access"]) # BEC Access - self.bec_access_router = BECAccessRouter(prefix=self.prefix, datasources=self.datasources) + self.bec_access_router = BECAccessRouter( + prefix=self.prefix, datasources=self.datasources, app=self + ) self.app.include_router(self.bec_access_router.router, tags=["BEC Access"]) # Session diff --git a/backend/bec_atlas/router/bec_access_router.py b/backend/bec_atlas/router/bec_access_router.py index 974d133..a27c3eb 100644 --- a/backend/bec_atlas/router/bec_access_router.py +++ b/backend/bec_atlas/router/bec_access_router.py @@ -8,14 +8,22 @@ from bec_atlas.authentication import convert_to_user, get_current_user from bec_atlas.datasources.mongodb.mongodb import MongoDBDatasource from bec_atlas.model.model import BECAccessProfile, User from bec_atlas.router.base_router import BaseRouter +from bec_atlas.router.user_router import UserLoginRequest if TYPE_CHECKING: # pragma: no cover from bec_atlas.datasources.datasource_manager import DatasourceManager + from bec_atlas.main import AtlasApp class BECAccessRouter(BaseRouter): - def __init__(self, prefix="/api/v1", datasources: DatasourceManager | None = None): + def __init__( + self, + prefix="/api/v1", + datasources: DatasourceManager | None = None, + app: AtlasApp | None = None, + ): super().__init__(prefix, datasources) + self.app = app if not self.datasources: raise RuntimeError("Datasources not loaded") @@ -28,6 +36,12 @@ class BECAccessRouter(BaseRouter): methods=["GET"], description="Retrieve the access key for a specific deployment and user.", ) + self.router.add_api_route( + "/bec_access_login", + self.get_bec_access_login, + methods=["POST"], + description="Login and retrieve the access key for a specific deployment and user.", + ) @convert_to_user async def get_bec_access( @@ -47,9 +61,41 @@ class BECAccessRouter(BaseRouter): """ if not user: user = current_user.email + out = self._retrieve_access_account(deployment_id, user, current_user=current_user) + return out + + async def get_bec_access_login( + self, user_login: UserLoginRequest, deployment_id: str, user: str + ) -> dict: + """ + Login and retrieve the access key for a specific deployment and user. + Exactly the same as get_bec_access, but with a login step to avoid the need + for an access token only to retrieve the ACL key. + + Args: + deployment_id (str): The deployment id + user (str): The user name to retrieve the access key for + password (str): The password for the user + """ + if not self.app: + raise RuntimeError("App not loaded") + + user_info = self.app.user_router._get_user(user_login) + if not user_info: + raise HTTPException(status_code=404, detail="User not found.") + current_user = self.db.get_user_by_email(user_info.email) + if not current_user: + raise HTTPException(status_code=404, detail="User not found.") + out = self._retrieve_access_account(deployment_id, user, current_user=current_user) + return out + + def _retrieve_access_account( + self, deployment_id: str, username: str, current_user: User + ) -> dict: + out = self.db.find_one( "bec_access_profiles", - {"deployment_id": deployment_id, "username": user}, + {"deployment_id": deployment_id, "username": username}, BECAccessProfile, user=current_user, ) diff --git a/backend/bec_atlas/router/user_router.py b/backend/bec_atlas/router/user_router.py index 1c7400c..2ad9949 100644 --- a/backend/bec_atlas/router/user_router.py +++ b/backend/bec_atlas/router/user_router.py @@ -57,17 +57,22 @@ class UserRouter(BaseRouter): return {"access_token": out, "token_type": "bearer"} async def user_login(self, user_login: UserLoginRequest, response: Response): - user = self._get_user(user_login) - if user is None: - raise HTTPException(status_code=401, detail="User not found or password is incorrect") - token = create_access_token(data={"email": user.email}) - response.set_cookie(key="access_token", value=token, httponly=True, secure=self.use_ssl) + token = self._user_login(user_login, response) return token async def user_logout(self, response: Response): response.delete_cookie("access_token") return {"message": "Logged out"} + def _user_login(self, user_login: UserLoginRequest, response: Response | None) -> str: + user = self._get_user(user_login) + if user is None: + raise HTTPException(status_code=401, detail="User not found or password is incorrect") + token = create_access_token(data={"email": user.email}) + if response: + response.set_cookie(key="access_token", value=token, httponly=True, secure=self.use_ssl) + return token + def _get_user(self, user_login: UserLoginRequest) -> UserInfo | None: user = self._get_functional_account(user_login) if user is None: diff --git a/backend/tests/test_deployment_access_router.py b/backend/tests/test_deployment_access_router.py index 98fe1fd..b8a6220 100644 --- a/backend/tests/test_deployment_access_router.py +++ b/backend/tests/test_deployment_access_router.py @@ -97,3 +97,15 @@ def test_patch_deployment_access(logged_in_client, backend): "/api/v1/bec_access", params={"deployment_id": deployment_id, "user": user} ) assert out.status_code == 404 + + # Test that the access can also be retrieved directly + client, _ = backend + client.post("/api/v1/user/logout") + response = client.post( + "/api/v1/bec_access_login", + json={"username": "admin@bec_atlas.ch", "password": "admin"}, + params={"deployment_id": deployment_id, "user": "test1"}, + ) + assert response.status_code == 200 + out = response.json() + assert "token" in out