added login page and started integrated of security

This commit is contained in:
GotthardG
2024-12-03 23:01:38 +01:00
parent 1798c480f6
commit 0d1374ded7
8 changed files with 302 additions and 101 deletions

View File

@ -5,8 +5,7 @@ from fastapi.middleware.cors import CORSMiddleware
from app import ssl_heidi
from pathlib import Path
from app.routers import address, contact, proposal, dewar, shipment, puck, spreadsheet, logistics
from app.routers import address, contact, proposal, dewar, shipment, puck, spreadsheet, logistics, auth
from app.database import Base, engine, SessionLocal, load_sample_data
app = FastAPI()
@ -40,6 +39,7 @@ def on_startup():
# Include routers with correct configuration
app.include_router(auth.router, prefix="/auth", tags=["auth"])
app.include_router(contact.router, prefix="/contacts", tags=["contacts"])
app.include_router(address.router, prefix="/addresses", tags=["addresses"])
app.include_router(proposal.router, prefix="/proposals", tags=["proposals"])

View File

@ -3,5 +3,6 @@ from .contact import router as contact_router
from .proposal import router as proposal_router
from .dewar import router as dewar_router
from .shipment import router as shipment_router
from .auth import router as auth_router
__all__ = ["address_router", "contact_router", "proposal_router", "dewar_router", "shipment_router"]
__all__ = ["address_router", "contact_router", "proposal_router", "dewar_router", "shipment_router", "auth_router"]

View File

@ -0,0 +1,82 @@
from fastapi import APIRouter, HTTPException, status, Depends
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.security import OAuth2AuthorizationCodeBearer
from app.schemas import loginToken, loginData
import jwt
from datetime import datetime, timedelta, timezone
# Define an APIRouter for authentication
router = APIRouter()
mock_users_db = {
"testuser": {
"username": "testuser",
"password": "testpass", # In a real scenario, store the hash of the password
"pgroups": [20000, 20001, 20003],
}
}
# https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/#hash-and-verify-the-passwords
# SECRET_KEY taken from FastAPI documentation, so not that secret :D
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2AuthorizationCodeBearer(authorizationUrl="/login", tokenUrl="/token/login")
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme)) -> loginData:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
token_expired_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expired",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
pgroups = payload.get("pgroups")
if username is None:
raise credentials_exception
token_data = loginData(username=username, pgroups=pgroups)
except jwt.ExpiredSignatureError:
raise token_expired_exception
except jwt.InvalidTokenError:
raise credentials_exception
return token_data
@router.post("/token/login", response_model=loginToken)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = mock_users_db.get(form_data.username)
if user is None or user["password"] != form_data.password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create token
access_token = create_access_token(
data={"sub": user["username"], "pgroups": user["pgroups"]}
)
return loginToken(access_token=access_token, token_type="bearer")
@router.get("/protected-route")
async def read_protected_data(current_user: loginData = Depends(get_current_user)):
return {"username": current_user.username, "pgroups": current_user.pgroups}