GotthardG 6cde57f783 **Commit Message:**
Enhance app with active pgroup handling and token updates

Added active pgroup state management across the app for user-specific settings. Improved token handling with decoding, saving user data, and setting OpenAPI authorization. Updated components, API calls, and forms to support dynamic pgroup selection and user-specific features.
2025-01-22 13:55:26 +01:00

80 lines
2.9 KiB
Python

from fastapi import APIRouter, HTTPException, status, Depends
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.security import OAuth2AuthorizationCodeBearer
from app.schemas import loginToken, loginData
import jwt
from datetime import datetime, timedelta, timezone
# Define an APIRouter for authentication
router = APIRouter()
mock_users_db = {
"testuser": {
"username": "testuser",
"password": "testpass", # In a real scenario, store the hash of the password
"pgroups": ["p20000", "p20001", "p20002", "p20003"],
},
"testuser2": {
"username": "testuser2",
"password": "testpass2", # In a real scenario, store the hash of the password
"pgroups": ["p20004", "p20005", "p20006"],
},
}
# https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/#hash-and-verify-the-passwords
# SECRET_KEY taken from FastAPI documentation, so not that secret :D
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2AuthorizationCodeBearer(
authorizationUrl="/login", tokenUrl="/token/login"
)
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")
async def get_current_user(token: str = Depends(oauth2_scheme)) -> loginData:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
print(f"[DEBUG] Username decoded from token: {username}") # Add debug log here
return loginData(username=username, pgroups=payload.get("pgroups"))
except jwt.ExpiredSignatureError:
print("[DEBUG] Token expired")
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
print("[DEBUG] Invalid token")
raise HTTPException(status_code=401, detail="Invalid token")
@router.post("/token/login", response_model=loginToken)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = mock_users_db.get(form_data.username)
if user is None or user["password"] != form_data.password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create token
access_token = create_access_token(
data={"sub": user["username"], "pgroups": user["pgroups"]}
)
return loginToken(access_token=access_token, token_type="bearer")
@router.get("/protected-route")
async def read_protected_data(current_user: loginData = Depends(get_current_user)):
return {"username": current_user.username, "pgroups": current_user.pgroups}