mirror of
https://github.com/bec-project/bec_atlas.git
synced 2025-07-14 07:01:48 +02:00
feat: towards a first version
This commit is contained in:
@ -1,6 +1,8 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from typing import Literal
|
||||
from typing import TYPE_CHECKING, Literal, Type, TypeVar
|
||||
|
||||
import pymongo
|
||||
from bec_lib.logger import bec_logger
|
||||
@ -11,6 +13,11 @@ from bec_atlas.model.model import User, UserCredentials
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bson import ObjectId
|
||||
|
||||
T = TypeVar("T", bound=BaseModel)
|
||||
|
||||
|
||||
class MongoDBDatasource:
|
||||
def __init__(self, config: dict) -> None:
|
||||
@ -107,19 +114,19 @@ class MongoDBDatasource:
|
||||
return UserCredentials(**out)
|
||||
|
||||
def find_one(
|
||||
self, collection: str, query_filter: dict, dtype: BaseModel, user: User | None = None
|
||||
) -> BaseModel | None:
|
||||
self, collection: str, query_filter: dict, dtype: Type[T], user: User | None = None
|
||||
) -> T | None:
|
||||
"""
|
||||
Find one document in the collection.
|
||||
|
||||
Args:
|
||||
collection (str): The collection name
|
||||
query_filter (dict): The filter to apply
|
||||
dtype (BaseModel): The data type to return
|
||||
dtype (Type[T]): The data type to return
|
||||
user (User): The user making the request
|
||||
|
||||
Returns:
|
||||
BaseModel: The data type with the document data
|
||||
T: The data type with the document data
|
||||
"""
|
||||
if user is not None:
|
||||
query_filter = self.add_user_filter(user, query_filter)
|
||||
@ -129,15 +136,15 @@ class MongoDBDatasource:
|
||||
return dtype(**out)
|
||||
|
||||
def find(
|
||||
self, collection: str, query_filter: dict, dtype: BaseModel, user: User | None = None
|
||||
) -> list[BaseModel]:
|
||||
self, collection: str, query_filter: dict, dtype: Type[T], user: User | None = None
|
||||
) -> list[T]:
|
||||
"""
|
||||
Find all documents in the collection.
|
||||
|
||||
Args:
|
||||
collection (str): The collection name
|
||||
query_filter (dict): The filter to apply
|
||||
dtype (BaseModel): The data type to return
|
||||
dtype (Type[T]): The data type to return
|
||||
user (User): The user making the request
|
||||
|
||||
Returns:
|
||||
@ -148,20 +155,88 @@ class MongoDBDatasource:
|
||||
out = self.db[collection].find(query_filter)
|
||||
return [dtype(**x) for x in out]
|
||||
|
||||
def post(self, collection: str, data: dict, dtype: Type[T], user: User | None = None) -> T:
|
||||
"""
|
||||
Post a single document to the collection.
|
||||
|
||||
Args:
|
||||
collection (str): The collection name
|
||||
data (dict): The data to insert
|
||||
dtype (Type[T]): The data type to return
|
||||
user (User): The user making the request
|
||||
|
||||
Returns:
|
||||
T: The data type with the document data
|
||||
"""
|
||||
if user is not None:
|
||||
data = self.add_user_filter(user, data, operation="w")
|
||||
out = self.db[collection].insert_one(data)
|
||||
return dtype(**data)
|
||||
|
||||
def patch(
|
||||
self,
|
||||
collection: str,
|
||||
id: ObjectId,
|
||||
update: dict,
|
||||
dtype: Type[T],
|
||||
user: User | None = None,
|
||||
return_document: bool = True,
|
||||
) -> T | None:
|
||||
"""
|
||||
Patch a single document in the collection.
|
||||
|
||||
Args:
|
||||
collection (str): The collection name
|
||||
id (ObjectId): The document id
|
||||
update (dict): The update to apply
|
||||
dtype (Type[T]): The data type to return
|
||||
user (User): The user making the request
|
||||
return_document (bool): When True, return the updated document, otherwise return the original document
|
||||
|
||||
Returns:
|
||||
Type[T]: The data type with the document data
|
||||
"""
|
||||
search_filter = {"_id": id}
|
||||
if user is not None:
|
||||
search_filter = self.add_user_filter(user, search_filter, operation="w")
|
||||
out = self.db[collection].find_one_and_update(
|
||||
filter=search_filter, update={"$set": update}, return_document=return_document
|
||||
)
|
||||
if out is None:
|
||||
return None
|
||||
return dtype(**out)
|
||||
|
||||
def delete_one(self, collection: str, filter: dict, user: User | None = None) -> bool:
|
||||
"""
|
||||
Delete a single document in the collection.
|
||||
|
||||
Args:
|
||||
collection (str): The collection name
|
||||
filter (dict): The filter to apply
|
||||
user (User): The user making the request
|
||||
|
||||
Returns:
|
||||
bool: True if the document was deleted, otherwise False
|
||||
"""
|
||||
if user is not None:
|
||||
filter = self.add_user_filter(user, filter, operation="w")
|
||||
out = self.db[collection].delete_one(filter)
|
||||
return out.deleted_count > 0
|
||||
|
||||
def aggregate(
|
||||
self, collection: str, pipeline: list[dict], dtype: BaseModel, user: User | None = None
|
||||
) -> list[BaseModel]:
|
||||
self, collection: str, pipeline: list[dict], dtype: Type[T], user: User | None = None
|
||||
) -> list[T]:
|
||||
"""
|
||||
Aggregate documents in the collection.
|
||||
|
||||
Args:
|
||||
collection (str): The collection name
|
||||
pipeline (list[dict]): The aggregation pipeline
|
||||
dtype (BaseModel): The data type to return
|
||||
dtype (Type[T]): The data type to return
|
||||
user (User): The user making the request
|
||||
|
||||
Returns:
|
||||
list[BaseModel]: The data type with the document data
|
||||
list[T]: The data type with the document data
|
||||
"""
|
||||
if user is not None:
|
||||
# Add the user filter to the lookup pipeline
|
||||
|
@ -7,7 +7,7 @@ from redis.asyncio import Redis as AsyncRedis
|
||||
from redis.exceptions import AuthenticationError
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_atlas.model.model import Deployments
|
||||
from bec_atlas.model.model import DeploymentCredential
|
||||
|
||||
|
||||
class RedisDatasource:
|
||||
@ -63,31 +63,35 @@ class RedisDatasource:
|
||||
"default", enabled=True, categories=["-@all"], commands=["+auth", "+acl|whoami"]
|
||||
)
|
||||
|
||||
def add_deployment_acl(self, deployment: Deployments):
|
||||
def add_deployment_acl(self, deployment_credential: DeploymentCredential):
|
||||
"""
|
||||
Add ACLs for the deployment.
|
||||
|
||||
Args:
|
||||
deployment (Deployments): The deployment object
|
||||
"""
|
||||
print(f"Adding ACLs for deployment <{deployment.name}>({deployment.id})")
|
||||
print(f"Adding ACLs for deployment {deployment_credential.id}")
|
||||
dep_id = deployment_credential.id
|
||||
dep_key = deployment_credential.credential
|
||||
self.connector._redis_conn.acl_setuser(
|
||||
f"ingestor_{deployment.id}",
|
||||
f"ingestor_{dep_id}",
|
||||
enabled=True,
|
||||
passwords=f"+{deployment.deployment_key}",
|
||||
passwords=f"+{dep_key}",
|
||||
categories=["+@all", "-@dangerous"],
|
||||
keys=[
|
||||
f"internal/deployment/{deployment.id}/*",
|
||||
f"internal/deployment/{deployment.id}/*/state",
|
||||
f"internal/deployment/{deployment.id}/*/data/*",
|
||||
f"internal/deployment/{dep_id}/*",
|
||||
f"internal/deployment/{dep_id}/*/state",
|
||||
f"internal/deployment/{dep_id}/*/data/*",
|
||||
f"internal/deployment/{dep_id}/bec_access",
|
||||
],
|
||||
channels=[
|
||||
f"internal/deployment/{deployment.id}/*/state",
|
||||
f"internal/deployment/{deployment.id}/*",
|
||||
f"internal/deployment/{deployment.id}/request",
|
||||
f"internal/deployment/{deployment.id}/request_response/*",
|
||||
f"internal/deployment/{dep_id}/*/state",
|
||||
f"internal/deployment/{dep_id}/*",
|
||||
f"internal/deployment/{dep_id}/request",
|
||||
f"internal/deployment/{dep_id}/request_response/*",
|
||||
f"internal/deployment/{dep_id}/bec_access",
|
||||
],
|
||||
commands=[f"+keys|internal/deployment/{deployment.id}/*/state"],
|
||||
commands=[f"+keys|internal/deployment/{dep_id}/*/state"],
|
||||
reset_channels=True,
|
||||
reset_keys=True,
|
||||
)
|
||||
|
@ -163,7 +163,7 @@ class DataIngestor:
|
||||
self.handle_message(out, deployment_id)
|
||||
self.redis._redis_conn.xack(stream, "ingestor", message[0])
|
||||
|
||||
def handle_message(self, msg_dict: dict, deploymend_id: str):
|
||||
def handle_message(self, msg_dict: dict, deployment_id: str):
|
||||
"""
|
||||
Handle a message from the Redis queue.
|
||||
|
||||
@ -177,7 +177,7 @@ class DataIngestor:
|
||||
return
|
||||
|
||||
if isinstance(data, messages.ScanStatusMessage):
|
||||
self.update_scan_status(data, deploymend_id)
|
||||
self.update_scan_status(data, deployment_id)
|
||||
|
||||
@lru_cache()
|
||||
def get_default_session_id(self, deployment_id: str):
|
||||
|
@ -1,7 +1,11 @@
|
||||
import uvicorn
|
||||
from fastapi import FastAPI
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
from bec_atlas.datasources.datasource_manager import DatasourceManager
|
||||
from bec_atlas.router.bec_access_router import BECAccessRouter
|
||||
from bec_atlas.router.deployment_access_router import DeploymentAccessRouter
|
||||
from bec_atlas.router.deployment_credentials import DeploymentCredentialsRouter
|
||||
from bec_atlas.router.deployments_router import DeploymentsRouter
|
||||
from bec_atlas.router.realm_router import RealmRouter
|
||||
from bec_atlas.router.redis_router import RedisRouter, RedisWebsocket
|
||||
@ -13,6 +17,8 @@ CONFIG = {
|
||||
"mongodb": {"host": "localhost", "port": 27017},
|
||||
}
|
||||
|
||||
origins = ["http://localhost:4200", "http://localhost"]
|
||||
|
||||
|
||||
class AtlasApp:
|
||||
API_VERSION = "v1"
|
||||
@ -20,6 +26,13 @@ class AtlasApp:
|
||||
def __init__(self, config=None):
|
||||
self.config = config or CONFIG
|
||||
self.app = FastAPI()
|
||||
self.app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=origins,
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
self.server = None
|
||||
self.prefix = f"/api/{self.API_VERSION}"
|
||||
self.datasources = DatasourceManager(config=self.config)
|
||||
@ -43,12 +56,31 @@ class AtlasApp:
|
||||
raise ValueError("Datasources not loaded")
|
||||
self.scan_router = ScanRouter(prefix=self.prefix, datasources=self.datasources)
|
||||
self.app.include_router(self.scan_router.router, tags=["Scan"])
|
||||
|
||||
self.user_router = UserRouter(prefix=self.prefix, datasources=self.datasources)
|
||||
self.app.include_router(self.user_router.router, tags=["User"])
|
||||
|
||||
self.deployment_router = DeploymentsRouter(prefix=self.prefix, datasources=self.datasources)
|
||||
self.app.include_router(self.deployment_router.router, tags=["Deployment"])
|
||||
|
||||
self.deployment_credentials_router = DeploymentCredentialsRouter(
|
||||
prefix=self.prefix, datasources=self.datasources
|
||||
)
|
||||
self.app.include_router(
|
||||
self.deployment_credentials_router.router, tags=["Deployment Credentials"]
|
||||
)
|
||||
|
||||
self.deployment_access_router = DeploymentAccessRouter(
|
||||
prefix=self.prefix, datasources=self.datasources
|
||||
)
|
||||
self.app.include_router(self.deployment_access_router.router, tags=["Deployment Access"])
|
||||
|
||||
self.bec_access_router = BECAccessRouter(prefix=self.prefix, datasources=self.datasources)
|
||||
self.app.include_router(self.bec_access_router.router, tags=["BEC Access"])
|
||||
|
||||
self.realm_router = RealmRouter(prefix=self.prefix, datasources=self.datasources)
|
||||
self.app.include_router(self.realm_router.router, tags=["Realm"])
|
||||
|
||||
self.redis_router = RedisRouter(prefix=self.prefix, datasources=self.datasources)
|
||||
self.app.include_router(self.redis_router.router, tags=["Redis"])
|
||||
|
||||
|
@ -1,6 +1,5 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from typing import Any, Literal
|
||||
|
||||
from bec_lib import messages
|
||||
@ -53,7 +52,6 @@ class UserInfo(BaseModel):
|
||||
class Deployments(MongoBaseModel, AccessProfile):
|
||||
realm_id: str | ObjectId
|
||||
name: str
|
||||
deployment_key: str = Field(default_factory=lambda: str(uuid.uuid4()))
|
||||
active_session_id: str | ObjectId | None = None
|
||||
config_templates: list[str | ObjectId] = []
|
||||
|
||||
@ -61,11 +59,57 @@ class Deployments(MongoBaseModel, AccessProfile):
|
||||
class DeploymentsPartial(MongoBaseModel, AccessProfilePartial):
|
||||
realm_id: str | ObjectId | None = None
|
||||
name: str | None = None
|
||||
deployment_key: str | None = None
|
||||
active_session_id: str | ObjectId | None = None
|
||||
config_templates: list[str | ObjectId] | None = None
|
||||
|
||||
|
||||
class DeploymentCredential(MongoBaseModel):
|
||||
credential: str
|
||||
|
||||
|
||||
class DeploymentAccess(MongoBaseModel, AccessProfile):
|
||||
"""
|
||||
The DeploymentAccess model is used to store the access control
|
||||
lists for the deployment. The access control lists are used to
|
||||
control access to the BEC deployment and contain either user
|
||||
or group names.
|
||||
Once the access control lists are updated, the corresponding
|
||||
BECAccessProfiles for this deployment are updated to reflect
|
||||
the changes.
|
||||
|
||||
Owner: beamline staff
|
||||
"""
|
||||
|
||||
user_read_access: list[str] = []
|
||||
user_write_access: list[str] = []
|
||||
su_read_access: list[str] = []
|
||||
su_write_access: list[str] = []
|
||||
remote_access: list[str] = []
|
||||
|
||||
|
||||
class BECAccessProfile(MongoBaseModel, AccessProfile):
|
||||
"""
|
||||
The BECAccessProfile model is used to store the Redis ACL config
|
||||
for BEC of a user. The username can be either a user or a group.
|
||||
The config fields (categories, keys, channels, commands) are determined
|
||||
based on the access level given through the corresponding DeploymentAccess
|
||||
document.
|
||||
|
||||
Owner: admin
|
||||
Access: user or group matching the username
|
||||
|
||||
"""
|
||||
|
||||
deployment_id: str | ObjectId
|
||||
username: str
|
||||
passwords: dict[str, str] = {}
|
||||
categories: list[str] = []
|
||||
keys: list[str] = []
|
||||
channels: list[str] = []
|
||||
commands: list[str] = []
|
||||
profile: str = ""
|
||||
|
||||
|
||||
class Realm(MongoBaseModel, AccessProfile):
|
||||
realm_id: str
|
||||
deployments: list[Deployments | DeploymentsPartial] = []
|
||||
|
51
backend/bec_atlas/router/bec_access_router.py
Normal file
51
backend/bec_atlas/router/bec_access_router.py
Normal file
@ -0,0 +1,51 @@
|
||||
from bson import ObjectId
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query
|
||||
|
||||
from bec_atlas.authentication import get_current_user
|
||||
from bec_atlas.datasources.mongodb.mongodb import MongoDBDatasource
|
||||
from bec_atlas.model.model import BECAccessProfile, UserInfo
|
||||
from bec_atlas.router.base_router import BaseRouter
|
||||
|
||||
|
||||
class BECAccessRouter(BaseRouter):
|
||||
def __init__(self, prefix="/api/v1", datasources=None):
|
||||
super().__init__(prefix, datasources)
|
||||
self.db: MongoDBDatasource = self.datasources.datasources.get("mongodb")
|
||||
self.router = APIRouter(prefix=prefix)
|
||||
self.router.add_api_route(
|
||||
"/bec_access",
|
||||
self.get_bec_access,
|
||||
methods=["GET"],
|
||||
description="Retrieve the access key for a specific deployment and user.",
|
||||
)
|
||||
|
||||
async def get_bec_access(
|
||||
self,
|
||||
deployment_id: str,
|
||||
user: str = Query(None),
|
||||
current_user: UserInfo = Depends(get_current_user),
|
||||
) -> dict:
|
||||
"""
|
||||
Retrieve the access key for a specific deployment and user.
|
||||
|
||||
Args:
|
||||
deployment_id (str): The deployment id
|
||||
user (str): The user name to retrieve the access key for. If not provided,
|
||||
the access key for the current user will be retrieved.
|
||||
current_user (UserInfo): The current user
|
||||
"""
|
||||
if not user:
|
||||
user = current_user.email
|
||||
out = self.db.find_one(
|
||||
"bec_access_profiles",
|
||||
{"deployment_id": ObjectId(deployment_id), "username": user},
|
||||
BECAccessProfile,
|
||||
user=current_user,
|
||||
)
|
||||
|
||||
if not out:
|
||||
raise HTTPException(status_code=404, detail="Access key not found.")
|
||||
|
||||
# Return the newest access key
|
||||
timestamps = sorted(out.passwords.keys())
|
||||
return {"token": out.passwords[timestamps[-1]]}
|
227
backend/bec_atlas/router/deployment_access_router.py
Normal file
227
backend/bec_atlas/router/deployment_access_router.py
Normal file
@ -0,0 +1,227 @@
|
||||
import secrets
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from bec_lib.endpoints import EndpointInfo, MessageOp
|
||||
from bec_lib.serialization import MsgpackSerialization
|
||||
from bson import ObjectId
|
||||
from fastapi import APIRouter, Depends
|
||||
|
||||
from bec_atlas.authentication import get_current_user
|
||||
from bec_atlas.datasources.mongodb.mongodb import MongoDBDatasource
|
||||
from bec_atlas.model.model import BECAccessProfile, DeploymentAccess, UserInfo
|
||||
from bec_atlas.router.base_router import BaseRouter
|
||||
from bec_atlas.router.redis_router import RedisAtlasEndpoints
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_atlas.datasources.redis_datasource import RedisDatasource
|
||||
|
||||
|
||||
class DeploymentAccessRouter(BaseRouter):
|
||||
def __init__(self, prefix="/api/v1", datasources=None):
|
||||
super().__init__(prefix, datasources)
|
||||
self.db: MongoDBDatasource = self.datasources.datasources.get("mongodb")
|
||||
self.router = APIRouter(prefix=prefix)
|
||||
self.router.add_api_route(
|
||||
"/deployment_access",
|
||||
self.get_deployment_access,
|
||||
methods=["GET"],
|
||||
description="Get the access lists for a specific deployment.",
|
||||
response_model=DeploymentAccess,
|
||||
)
|
||||
self.router.add_api_route(
|
||||
"/deployment_access",
|
||||
self.patch_deployment_access,
|
||||
methods=["PATCH"],
|
||||
description="Update the access lists for a specific deployment.",
|
||||
response_model=DeploymentAccess,
|
||||
)
|
||||
|
||||
async def get_deployment_access(
|
||||
self, deployment_id: str, current_user: UserInfo = Depends(get_current_user)
|
||||
) -> DeploymentAccess:
|
||||
"""
|
||||
Get the access lists for a specific deployment.
|
||||
|
||||
Args:
|
||||
deployment_id (str): The deployment id
|
||||
current_user (UserInfo): The current user
|
||||
|
||||
Returns:
|
||||
DeploymentAccess: The access lists for the deployment
|
||||
"""
|
||||
return self.db.find_one(
|
||||
"deployments", {"_id": ObjectId(deployment_id)}, DeploymentAccess, user=current_user
|
||||
)
|
||||
|
||||
async def patch_deployment_access(
|
||||
self,
|
||||
deployment_id: str,
|
||||
deployment_access: dict,
|
||||
current_user: UserInfo = Depends(get_current_user),
|
||||
) -> DeploymentAccess:
|
||||
"""
|
||||
Update the access lists for a specific deployment.
|
||||
|
||||
Args:
|
||||
deployment_id (str): The deployment id
|
||||
deployment_access (DeploymentAccess): The deployment access object
|
||||
current_user (UserInfo): The current user
|
||||
|
||||
Returns:
|
||||
DeploymentAccess: The updated access lists for the deployment
|
||||
"""
|
||||
deployment_access.pop("_id", None)
|
||||
deployment_access.pop("id", None)
|
||||
deployment_access.pop("owner_groups", None)
|
||||
deployment_access.pop("access_groups", None)
|
||||
original = self.db.find_one(
|
||||
"deployment_access",
|
||||
{"_id": ObjectId(deployment_id)},
|
||||
DeploymentAccess,
|
||||
user=current_user,
|
||||
)
|
||||
out = self.db.patch(
|
||||
collection="deployment_access",
|
||||
id=ObjectId(deployment_id),
|
||||
update=deployment_access,
|
||||
dtype=DeploymentAccess,
|
||||
user=current_user,
|
||||
)
|
||||
self._update_bec_access_profiles(original=original, updated=out)
|
||||
self._refresh_redis_bec_access(deployment_id)
|
||||
return out
|
||||
|
||||
def _update_bec_access_profiles(self, original: DeploymentAccess, updated: DeploymentAccess):
|
||||
"""
|
||||
Update the BEC access profiles in the database. This will not update the redis access.
|
||||
Call _refresh_redis_bec_access to update the redis access.
|
||||
|
||||
Args:
|
||||
deployment_access (DeploymentAccess): The deployment access object
|
||||
"""
|
||||
db: MongoDBDatasource = self.datasources.datasources.get("mongodb")
|
||||
|
||||
new_profiles = set(
|
||||
updated.user_read_access
|
||||
+ updated.user_write_access
|
||||
+ updated.su_read_access
|
||||
+ updated.su_write_access
|
||||
)
|
||||
old_profiles = set(
|
||||
original.user_read_access
|
||||
+ original.user_write_access
|
||||
+ original.su_read_access
|
||||
+ original.su_write_access
|
||||
)
|
||||
removed_profiles = old_profiles - new_profiles
|
||||
for profile in removed_profiles:
|
||||
db.delete_one("bec_access_profiles", {"username": profile, "deployment_id": updated.id})
|
||||
for profile in new_profiles:
|
||||
if profile in updated.su_write_access:
|
||||
access = self._get_redis_access_profile("su_write", profile, updated.id)
|
||||
elif profile in updated.su_read_access:
|
||||
access = self._get_redis_access_profile("su_read", profile, updated.id)
|
||||
elif profile in updated.user_write_access:
|
||||
access = self._get_redis_access_profile("user_write", profile, updated.id)
|
||||
else:
|
||||
access = self._get_redis_access_profile("user_read", profile, updated.id)
|
||||
|
||||
existing_profile = db.find_one(
|
||||
"bec_access_profiles",
|
||||
{"username": profile, "deployment_id": updated.id},
|
||||
BECAccessProfile,
|
||||
)
|
||||
if existing_profile:
|
||||
# access.passwords = existing_profile.passwords
|
||||
db.patch(
|
||||
"bec_access_profiles",
|
||||
existing_profile.id,
|
||||
access.model_dump(exclude_none=True, exclude_defaults=True),
|
||||
BECAccessProfile,
|
||||
)
|
||||
else:
|
||||
access.passwords = {str(time.time()): secrets.token_urlsafe(32)}
|
||||
db.post(
|
||||
"bec_access_profiles", access.model_dump(exclude_none=True), BECAccessProfile
|
||||
)
|
||||
|
||||
def _refresh_redis_bec_access(self, deployment_id: str):
|
||||
"""
|
||||
Refresh the redis BEC access.
|
||||
"""
|
||||
redis: RedisDatasource = self.datasources.datasources.get("redis")
|
||||
db: MongoDBDatasource = self.datasources.datasources.get("mongodb")
|
||||
profiles = db.find(
|
||||
"bec_access_profiles", {"deployment_id": ObjectId(deployment_id)}, BECAccessProfile
|
||||
)
|
||||
profiles = [profile.model_dump(exclude_none=True) for profile in profiles]
|
||||
for profile in profiles:
|
||||
profile.pop("owner_groups", None)
|
||||
profile.pop("access_groups", None)
|
||||
profile.pop("deployment_id", None)
|
||||
profile.pop("_id", None)
|
||||
|
||||
endpoint_info = EndpointInfo(
|
||||
RedisAtlasEndpoints.redis_bec_acl_user(deployment_id), Any, MessageOp.SET_PUBLISH
|
||||
)
|
||||
|
||||
redis.connector.set_and_publish(endpoint_info, MsgpackSerialization.dumps(profiles))
|
||||
|
||||
def _get_redis_access_profile(self, access_profile: str, username: str, deployment_id: str):
|
||||
"""
|
||||
Get the redis access profile.
|
||||
|
||||
Args:
|
||||
access_profile (str): The access profile
|
||||
username (str): The username
|
||||
deployment_id (str): The deployment id
|
||||
|
||||
"""
|
||||
if access_profile == "su_write":
|
||||
return BECAccessProfile(
|
||||
owner_groups=["admin"],
|
||||
access_groups=[username],
|
||||
deployment_id=deployment_id,
|
||||
username=username,
|
||||
categories=["+@all"],
|
||||
keys=["*"],
|
||||
channels=["*"],
|
||||
commands=["+all"],
|
||||
profile="su_write",
|
||||
)
|
||||
if access_profile == "su_read":
|
||||
return BECAccessProfile(
|
||||
owner_groups=["admin"],
|
||||
access_groups=[username],
|
||||
deployment_id=deployment_id,
|
||||
username=username,
|
||||
categories=["+@all", "-@dangerous"],
|
||||
keys=["*"],
|
||||
channels=["*"],
|
||||
commands=["+read"],
|
||||
profile="su_read",
|
||||
)
|
||||
if access_profile == "user_write":
|
||||
return BECAccessProfile(
|
||||
owner_groups=["admin"],
|
||||
access_groups=[username],
|
||||
deployment_id=deployment_id,
|
||||
username=username,
|
||||
categories=["+@all", "-@dangerous"],
|
||||
keys=["*"],
|
||||
channels=["*"],
|
||||
commands=["+write"],
|
||||
profile="user_write",
|
||||
)
|
||||
return BECAccessProfile(
|
||||
owner_groups=["admin"],
|
||||
access_groups=[username],
|
||||
deployment_id=deployment_id,
|
||||
username=username,
|
||||
categories=["+@all", "-@dangerous"],
|
||||
keys=["*"],
|
||||
channels=["*"],
|
||||
commands=["+read"],
|
||||
profile="user_read",
|
||||
)
|
85
backend/bec_atlas/router/deployment_credentials.py
Normal file
85
backend/bec_atlas/router/deployment_credentials.py
Normal file
@ -0,0 +1,85 @@
|
||||
import secrets
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from bson import ObjectId
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
|
||||
from bec_atlas.authentication import get_current_user
|
||||
from bec_atlas.datasources.mongodb.mongodb import MongoDBDatasource
|
||||
from bec_atlas.model.model import DeploymentCredential, UserInfo
|
||||
from bec_atlas.router.base_router import BaseRouter
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_atlas.datasources.redis_datasource import RedisDatasource
|
||||
|
||||
|
||||
class DeploymentCredentialsRouter(BaseRouter):
|
||||
def __init__(self, prefix="/api/v1", datasources=None):
|
||||
super().__init__(prefix, datasources)
|
||||
self.db: MongoDBDatasource = self.datasources.datasources.get("mongodb")
|
||||
self.router = APIRouter(prefix=prefix)
|
||||
self.router.add_api_route(
|
||||
"/deploymentCredentials",
|
||||
self.deployment_credential,
|
||||
methods=["GET"],
|
||||
description="Retrieve the deployment key for a specific deployment.",
|
||||
response_model=DeploymentCredential,
|
||||
)
|
||||
self.router.add_api_route(
|
||||
"/deploymentCredentials/refresh",
|
||||
self.refresh_deployment_credentials,
|
||||
methods=["POST"],
|
||||
description="Refresh the deployment key for a specific deployment.",
|
||||
response_model=DeploymentCredential,
|
||||
)
|
||||
|
||||
async def deployment_credential(
|
||||
self, deployment_id: str, current_user: UserInfo = Depends(get_current_user)
|
||||
) -> DeploymentCredential:
|
||||
"""
|
||||
Get the credentials for a deployment.
|
||||
|
||||
Args:
|
||||
deployment_id (str): The deployment id
|
||||
"""
|
||||
if set(current_user.groups) & set(["admin", "bec_group"]):
|
||||
out = self.db.find(
|
||||
"deployment_credentials", {"_id": ObjectId(deployment_id)}, DeploymentCredential
|
||||
)
|
||||
if len(out) > 0:
|
||||
return out[0]
|
||||
return None
|
||||
|
||||
raise HTTPException(
|
||||
status_code=403, detail="User does not have permission to access this resource."
|
||||
)
|
||||
|
||||
async def refresh_deployment_credentials(
|
||||
self, deployment_id: str, current_user: UserInfo = Depends(get_current_user)
|
||||
):
|
||||
"""
|
||||
Refresh the deployment credentials.
|
||||
|
||||
Args:
|
||||
deployment_id (str): The deployment id
|
||||
|
||||
"""
|
||||
if set(current_user.groups) & set(["admin", "bec_group"]):
|
||||
token = secrets.token_urlsafe(32)
|
||||
out = self.db.patch(
|
||||
"deployment_credentials",
|
||||
id=ObjectId(deployment_id),
|
||||
update={"credential": token},
|
||||
dtype=DeploymentCredential,
|
||||
)
|
||||
if out is None:
|
||||
raise HTTPException(status_code=404, detail="Deployment not found")
|
||||
|
||||
# update the redis deployment key
|
||||
redis: RedisDatasource = self.datasources.datasources.get("redis")
|
||||
redis.add_deployment_acl(out)
|
||||
|
||||
return out
|
||||
raise HTTPException(
|
||||
status_code=403, detail="User does not have permission to access this resource."
|
||||
)
|
@ -5,7 +5,7 @@ from fastapi import APIRouter, Depends
|
||||
|
||||
from bec_atlas.authentication import get_current_user
|
||||
from bec_atlas.datasources.mongodb.mongodb import MongoDBDatasource
|
||||
from bec_atlas.model.model import Deployments, UserInfo
|
||||
from bec_atlas.model.model import DeploymentCredential, Deployments, UserInfo
|
||||
from bec_atlas.router.base_router import BaseRouter
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@ -18,14 +18,14 @@ class DeploymentsRouter(BaseRouter):
|
||||
self.db: MongoDBDatasource = self.datasources.datasources.get("mongodb")
|
||||
self.router = APIRouter(prefix=prefix)
|
||||
self.router.add_api_route(
|
||||
"/deployments/realm/{realm}",
|
||||
"/deployments/realm",
|
||||
self.deployments,
|
||||
methods=["GET"],
|
||||
description="Get all deployments for the realm",
|
||||
response_model=list[Deployments],
|
||||
)
|
||||
self.router.add_api_route(
|
||||
"/deployments/id/{deployment_id}",
|
||||
"/deployments/id",
|
||||
self.deployment_with_id,
|
||||
methods=["GET"],
|
||||
description="Get a single deployment by id for a realm",
|
||||
@ -65,10 +65,11 @@ class DeploymentsRouter(BaseRouter):
|
||||
Update the available deployments.
|
||||
"""
|
||||
self.available_deployments = self.db.find("deployments", {}, Deployments)
|
||||
credentials = self.db.find("deployment_credentials", {}, DeploymentCredential)
|
||||
|
||||
redis: RedisDatasource = self.datasources.datasources.get("redis")
|
||||
msg = json.dumps([msg.model_dump() for msg in self.available_deployments])
|
||||
redis.connector.set_and_publish("deployments", msg)
|
||||
if redis.reconfigured_acls:
|
||||
for deployment in self.available_deployments:
|
||||
for deployment in credentials:
|
||||
redis.add_deployment_acl(deployment)
|
||||
|
@ -20,7 +20,7 @@ class RealmRouter(BaseRouter):
|
||||
response_model_exclude_none=True,
|
||||
)
|
||||
self.router.add_api_route(
|
||||
"/realms/{realm_id}",
|
||||
"/realms/id",
|
||||
self.realm_with_id,
|
||||
methods=["GET"],
|
||||
description="Get a single realm by id",
|
||||
|
@ -95,6 +95,19 @@ class RedisAtlasEndpoints:
|
||||
"""
|
||||
return f"internal/deployment/{deployment}/request_response/{request_id}"
|
||||
|
||||
@staticmethod
|
||||
def redis_bec_acl_user(deployment_id: str):
|
||||
"""
|
||||
Endpoint for the redis BEC ACL user for a deployment.
|
||||
|
||||
Args:
|
||||
deployment_id (str): The deployment id
|
||||
|
||||
Returns:
|
||||
str: The endpoint for the redis BEC ACL user
|
||||
"""
|
||||
return f"internal/deployment/{deployment_id}/bec_access"
|
||||
|
||||
|
||||
class MsgResponse(Response):
|
||||
media_type = "application/json"
|
||||
@ -115,7 +128,7 @@ class RedisRouter(BaseRouter):
|
||||
|
||||
self.router = APIRouter(prefix=prefix)
|
||||
self.router.add_api_route(
|
||||
"/redis/{deployment}", self.redis_get, methods=["GET"], response_class=MsgResponse
|
||||
"/redis", self.redis_get, methods=["GET"], response_class=MsgResponse
|
||||
)
|
||||
self.router.add_api_route("/redis", self.redis_post, methods=["POST"])
|
||||
self.router.add_api_route("/redis", self.redis_delete, methods=["DELETE"])
|
||||
|
@ -12,14 +12,14 @@ class ScanRouter(BaseRouter):
|
||||
self.db: MongoDBDatasource = self.datasources.datasources.get("mongodb")
|
||||
self.router = APIRouter(prefix=prefix)
|
||||
self.router.add_api_route(
|
||||
"/scans/session/{session_id}",
|
||||
"/scans/session",
|
||||
self.scans,
|
||||
methods=["GET"],
|
||||
description="Get all scans for a session",
|
||||
response_model=list[ScanStatus],
|
||||
)
|
||||
self.router.add_api_route(
|
||||
"/scans/id/{scan_id}",
|
||||
"/scans/id",
|
||||
self.scans_with_id,
|
||||
methods=["GET"],
|
||||
description="Get a single scan by id for a session",
|
||||
|
@ -39,13 +39,14 @@ class UserRouter(BaseRouter):
|
||||
return {"access_token": out, "token_type": "bearer"}
|
||||
|
||||
async def user_login(self, user_login: UserLoginRequest):
|
||||
exc = HTTPException(status_code=401, detail="User not found or password is incorrect")
|
||||
user = self.db.get_user_by_email(user_login.username)
|
||||
if user is None:
|
||||
raise HTTPException(status_code=404, detail="User not found")
|
||||
raise exc
|
||||
credentials = self.db.get_user_credentials(user.id)
|
||||
if credentials is None:
|
||||
raise HTTPException(status_code=404, detail="User not found")
|
||||
raise exc
|
||||
if not verify_password(user_login.password, credentials.password):
|
||||
raise HTTPException(status_code=401, detail="Invalid password")
|
||||
raise exc
|
||||
|
||||
return create_access_token(data={"groups": list(user.groups), "email": user.email})
|
||||
|
@ -1,3 +1,5 @@
|
||||
import secrets
|
||||
|
||||
import pymongo
|
||||
|
||||
from bec_atlas.model import Deployments, Realm, Session
|
||||
@ -43,8 +45,29 @@ class DemoSetupLoader:
|
||||
owner_groups=["admin", "demo"],
|
||||
access_groups=["demo"],
|
||||
)
|
||||
if self.db["deployments"].find_one({"name": deployment.name}) is None:
|
||||
existing_deployment = self.db["deployments"].find_one({"name": deployment.name})
|
||||
if existing_deployment is None:
|
||||
self.db["deployments"].insert_one(deployment.__dict__)
|
||||
existing_deployment = self.db["deployments"].find_one({"name": deployment.name})
|
||||
deployment = existing_deployment
|
||||
|
||||
if self.db["deployment_credentials"].find_one({"_id": deployment["_id"]}) is None:
|
||||
deployment_credential = {
|
||||
"_id": deployment["_id"],
|
||||
"credential": secrets.token_urlsafe(32),
|
||||
}
|
||||
self.db["deployment_credentials"].insert_one(deployment_credential)
|
||||
deployment_access = {
|
||||
"_id": deployment["_id"],
|
||||
"owner_groups": ["admin", "demo"],
|
||||
"access_groups": [],
|
||||
"user_read_access": [],
|
||||
"user_write_access": [],
|
||||
"su_read_access": [],
|
||||
"su_write_access": [],
|
||||
"remote_access": [],
|
||||
}
|
||||
self.db["deployment_access"].insert_one(deployment_access)
|
||||
|
||||
if self.db["sessions"].find_one({"name": "_default_"}) is None:
|
||||
deployment = self.db["deployments"].find_one({"name": deployment.name})
|
||||
|
Reference in New Issue
Block a user