refactor: moved demo func accounts to separate file; added cleanup to func account handling

This commit is contained in:
2025-02-07 10:48:41 +01:00
parent 52eb2e661c
commit 1684860db8
2 changed files with 53 additions and 26 deletions

View File

@ -0,0 +1,18 @@
[
{
"email": "admin@bec_atlas.ch",
"password": "admin",
"groups": ["demo", "admin"],
"first_name": "Admin",
"last_name": "Admin",
"owner_groups": ["admin"]
},
{
"email": "jane.doe@bec_atlas.ch",
"password": "atlas",
"groups": ["demo"],
"first_name": "Jane",
"last_name": "Doe",
"owner_groups": ["admin"]
}
]

View File

@ -54,39 +54,43 @@ class MongoDBDatasource:
Load the functional accounts to the database.
"""
functional_accounts_file = os.path.join(
os.path.dirname(__file__), "functional_accounts.json"
os.path.dirname(__file__), ".functional_accounts.json"
)
if not os.path.exists(functional_accounts_file):
raise FileNotFoundError(
f"Could not find functional accounts file at {functional_accounts_file}"
)
with open(functional_accounts_file, "r", encoding="utf-8") as file:
functional_accounts = json.load(file)
for account in functional_accounts:
account["groups"].append("atlas_func_account")
account = list(set(account["groups"]))
existing_accounts = list(
self.db["users"].find({"groups": {"$in": ["atlas_func_account"]}}, {"email": 1})
)
if os.path.exists(functional_accounts_file):
with open(functional_accounts_file, "r", encoding="utf-8") as file:
functional_accounts = json.load(file)
else:
print("Functional accounts file not found. Using default demo accounts.")
# Demo accounts
functional_accounts = [
{
"email": "admin@bec_atlas.ch",
"password": "admin",
"groups": ["demo", "admin"],
"first_name": "Admin",
"last_name": "Admin",
"owner_groups": ["admin"],
},
{
"email": "jane.doe@bec_atlas.ch",
"password": "atlas",
"groups": ["demo"],
"first_name": "Jane",
"last_name": "Doe",
"owner_groups": ["admin"],
},
]
for account in functional_accounts:
# check if the account already exists in the database
password = account.pop("password")
password_hash = get_password_hash(password)
result = self.db["users"].find_one({"email": account["email"]})
result = None
for existing_account in existing_accounts:
if existing_account["email"] == account["email"]:
result = existing_account
existing_accounts.remove(existing_account)
break
if result is not None:
# account already exists; check if the password is the same
credentials = self.db["user_credentials"].find_one({"user_id": result["_id"]})
if credentials is not None and credentials["password"] == password_hash:
continue
# update the password
self.db["user_credentials"].update_one(
{"user_id": result["_id"]}, {"$set": {"password": password_hash}}
)
continue
user = User(**account)
user = self.db["users"].insert_one(user.__dict__)
@ -95,6 +99,11 @@ class MongoDBDatasource:
)
self.db["user_credentials"].insert_one(credentials.__dict__)
# remove any accounts that are no longer in the functional accounts file
for account in existing_accounts:
self.db["users"].delete_one({"_id": account["_id"]})
self.db["user_credentials"].delete_one({"user_id": account["_id"]})
def get_user_by_email(self, email: str) -> User | None:
"""
Get the user from the database.