2024-12-13 15:37:48 +01:00

83 lines
2.9 KiB
Python

from fastapi import APIRouter, HTTPException, status, Depends
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.security import OAuth2AuthorizationCodeBearer
from app.schemas import loginToken, loginData
import jwt
from datetime import datetime, timedelta, timezone
# Define an APIRouter for authentication
router = APIRouter()
mock_users_db = {
"testuser": {
"username": "testuser",
"password": "testpass", # In a real scenario, store the hash of the password
"pgroups": [20000, 20001, 20003],
}
}
# https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/#hash-and-verify-the-passwords
# SECRET_KEY taken from FastAPI documentation, so not that secret :D
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2AuthorizationCodeBearer(authorizationUrl="/login", tokenUrl="/token/login")
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")
async def get_current_user(token: str = Depends(oauth2_scheme)) -> loginData:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
token_expired_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expired",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
pgroups = payload.get("pgroups")
if username is None:
raise credentials_exception
token_data = loginData(username=username, pgroups=pgroups)
except jwt.ExpiredSignatureError:
raise token_expired_exception
except jwt.InvalidTokenError:
raise credentials_exception
return token_data
@router.post("/token/login", response_model=loginToken)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = mock_users_db.get(form_data.username)
if user is None or user["password"] != form_data.password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create token
access_token = create_access_token(
data={"sub": user["username"], "pgroups": user["pgroups"]}
)
return loginToken(access_token=access_token, token_type="bearer")
@router.get("/protected-route")
async def read_protected_data(current_user: loginData = Depends(get_current_user)):
return {"username": current_user.username, "pgroups": current_user.pgroups}