
Enhance app with active pgroup handling and token updates Added active pgroup state management across the app for user-specific settings. Improved token handling with decoding, saving user data, and setting OpenAPI authorization. Updated components, API calls, and forms to support dynamic pgroup selection and user-specific features.
155 lines
5.0 KiB
Python
155 lines
5.0 KiB
Python
from fastapi import Depends, HTTPException, status, Query
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import or_
|
|
from typing import List
|
|
|
|
from app.routers.auth import get_current_user
|
|
from app.schemas import (
|
|
Address as AddressSchema,
|
|
AddressCreate,
|
|
AddressUpdate,
|
|
loginData,
|
|
)
|
|
from app.models import Address as AddressModel
|
|
from app.dependencies import get_db
|
|
from app.routers.protected_router import protected_router
|
|
|
|
|
|
@protected_router.get("/", response_model=List[AddressSchema])
|
|
async def get_return_addresses(
|
|
active_pgroup: str = Query(...),
|
|
db: Session = Depends(get_db),
|
|
current_user: loginData = Depends(get_current_user),
|
|
):
|
|
if active_pgroup not in current_user.pgroups:
|
|
raise HTTPException(status_code=400, detail="Invalid pgroup provided.")
|
|
|
|
# Return only active addresses
|
|
user_addresses = (
|
|
db.query(AddressModel)
|
|
.filter(
|
|
AddressModel.pgroups.like(f"%{active_pgroup}%"),
|
|
AddressModel.status == "active",
|
|
)
|
|
.all()
|
|
)
|
|
return user_addresses
|
|
|
|
|
|
@protected_router.get("/all", response_model=List[AddressSchema])
|
|
async def get_all_addresses(
|
|
db: Session = Depends(get_db),
|
|
current_user: loginData = Depends(get_current_user),
|
|
):
|
|
# Fetch all active addresses associated with the user's pgroups
|
|
user_pgroups = current_user.pgroups
|
|
filters = [AddressModel.pgroups.like(f"%{pgroup}%") for pgroup in user_pgroups]
|
|
user_addresses = (
|
|
db.query(AddressModel)
|
|
.filter(AddressModel.status == "active", or_(*filters))
|
|
.all()
|
|
)
|
|
return user_addresses
|
|
|
|
|
|
@protected_router.post(
|
|
"/", response_model=AddressSchema, status_code=status.HTTP_201_CREATED
|
|
)
|
|
async def create_return_address(address: AddressCreate, db: Session = Depends(get_db)):
|
|
print("Payload received by backend:", address.dict()) # Log incoming payload
|
|
|
|
if db.query(AddressModel).filter(AddressModel.city == address.city).first():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Address in this city already exists.",
|
|
)
|
|
|
|
db_address = AddressModel(
|
|
pgroups=address.pgroups,
|
|
house_number=address.house_number,
|
|
street=address.street,
|
|
city=address.city,
|
|
state=address.state,
|
|
zipcode=address.zipcode,
|
|
country=address.country,
|
|
status="active",
|
|
)
|
|
|
|
db.add(db_address)
|
|
db.commit()
|
|
db.refresh(db_address)
|
|
return db_address
|
|
|
|
|
|
@protected_router.put("/{address_id}", response_model=AddressSchema)
|
|
async def update_return_address(
|
|
address_id: int, address: AddressUpdate, db: Session = Depends(get_db)
|
|
):
|
|
# Retrieve the existing address
|
|
db_address = db.query(AddressModel).filter(AddressModel.id == address_id).first()
|
|
if not db_address:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="Address not found."
|
|
)
|
|
|
|
# Normalize existing and new pgroups (remove whitespace, handle case
|
|
# sensitivity if needed)
|
|
existing_pgroups = (
|
|
set(p.strip() for p in db_address.pgroups.split(",") if p.strip())
|
|
if db_address.pgroups
|
|
else set()
|
|
)
|
|
new_pgroups = (
|
|
set(p.strip() for p in address.pgroups.split(",") if p.strip())
|
|
if address.pgroups
|
|
else set()
|
|
)
|
|
|
|
# Check if any old pgroups are being removed (strict validation against removal)
|
|
if not new_pgroups.issuperset(existing_pgroups):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Modifying pgroups to remove existing ones is not allowed.",
|
|
)
|
|
|
|
# Combine existing and new pgroups (only additions are allowed)
|
|
combined_pgroups = existing_pgroups.union(new_pgroups)
|
|
|
|
# Mark the current address as obsolete
|
|
db_address.status = "inactive"
|
|
db.commit()
|
|
db.refresh(db_address)
|
|
|
|
# Create a new address with updated values and the combined pgroups
|
|
new_address = AddressModel(
|
|
pgroups=",".join(combined_pgroups), # Join set back into comma-separated string
|
|
house_number=address.house_number or db_address.house_number,
|
|
street=address.street or db_address.street,
|
|
city=address.city or db_address.city,
|
|
state=address.state or db_address.state,
|
|
zipcode=address.zipcode or db_address.zipcode,
|
|
country=address.country or db_address.country,
|
|
status="active", # Newly created address will be active
|
|
)
|
|
|
|
# Save the new address
|
|
db.add(new_address)
|
|
db.commit()
|
|
db.refresh(new_address)
|
|
|
|
return new_address
|
|
|
|
|
|
@protected_router.delete("/{address_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def delete_return_address(address_id: int, db: Session = Depends(get_db)):
|
|
db_address = db.query(AddressModel).filter(AddressModel.id == address_id).first()
|
|
if not db_address:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="Address not found."
|
|
)
|
|
|
|
# Mark the address as obsolete instead of deleting it
|
|
db_address.status = "inactive"
|
|
db.commit()
|
|
return
|