aaredb/backend/app/routers/address.py
GotthardG 382b1eaba8 Refactor contact handling across backend and frontend
Replaced usage of "ContactPerson" with "Contact" for consistency across the codebase. Updated related component props, state variables, API calls, and database queries to align with the new model. Also enhanced backend functionality with stricter validations and added support for handling active pgroups in contact management.
2025-01-22 16:31:08 +01:00

156 lines
5.0 KiB
Python

from fastapi import Depends, HTTPException, status, Query, APIRouter
from sqlalchemy.orm import Session
from sqlalchemy import or_
from typing import List
from app.routers.auth import get_current_user
from app.schemas import (
Address as AddressSchema,
AddressCreate,
AddressUpdate,
loginData,
)
from app.models import Address as AddressModel
from app.dependencies import get_db
address_router = APIRouter()
@address_router.get("/", response_model=List[AddressSchema])
async def get_return_addresses(
active_pgroup: str = Query(...),
db: Session = Depends(get_db),
current_user: loginData = Depends(get_current_user),
):
if active_pgroup not in current_user.pgroups:
raise HTTPException(status_code=400, detail="Invalid pgroup provided.")
# Return only active addresses
user_addresses = (
db.query(AddressModel)
.filter(
AddressModel.pgroups.like(f"%{active_pgroup}%"),
AddressModel.status == "active",
)
.all()
)
return user_addresses
@address_router.get("/all", response_model=List[AddressSchema])
async def get_all_addresses(
db: Session = Depends(get_db),
current_user: loginData = Depends(get_current_user),
):
# Fetch all active addresses associated with the user's pgroups
user_pgroups = current_user.pgroups
filters = [AddressModel.pgroups.like(f"%{pgroup}%") for pgroup in user_pgroups]
user_addresses = (
db.query(AddressModel)
.filter(AddressModel.status == "active", or_(*filters))
.all()
)
return user_addresses
@address_router.post(
"/", response_model=AddressSchema, status_code=status.HTTP_201_CREATED
)
async def create_return_address(address: AddressCreate, db: Session = Depends(get_db)):
print("Payload received by backend:", address.dict()) # Log incoming payload
if db.query(AddressModel).filter(AddressModel.city == address.city).first():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Address in this city already exists.",
)
db_address = AddressModel(
pgroups=address.pgroups,
house_number=address.house_number,
street=address.street,
city=address.city,
state=address.state,
zipcode=address.zipcode,
country=address.country,
status="active",
)
db.add(db_address)
db.commit()
db.refresh(db_address)
return db_address
@address_router.put("/{address_id}", response_model=AddressSchema)
async def update_return_address(
address_id: int, address: AddressUpdate, db: Session = Depends(get_db)
):
# Retrieve the existing address
db_address = db.query(AddressModel).filter(AddressModel.id == address_id).first()
if not db_address:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Address not found."
)
# Normalize existing and new pgroups (remove whitespace, handle case
# sensitivity if needed)
existing_pgroups = (
set(p.strip() for p in db_address.pgroups.split(",") if p.strip())
if db_address.pgroups
else set()
)
new_pgroups = (
set(p.strip() for p in address.pgroups.split(",") if p.strip())
if address.pgroups
else set()
)
# Check if any old pgroups are being removed (strict validation against removal)
if not new_pgroups.issuperset(existing_pgroups):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Modifying pgroups to remove existing ones is not allowed.",
)
# Combine existing and new pgroups (only additions are allowed)
combined_pgroups = existing_pgroups.union(new_pgroups)
# Mark the current address as obsolete
db_address.status = "inactive"
db.commit()
db.refresh(db_address)
# Create a new address with updated values and the combined pgroups
new_address = AddressModel(
pgroups=",".join(combined_pgroups), # Join set back into comma-separated string
house_number=address.house_number or db_address.house_number,
street=address.street or db_address.street,
city=address.city or db_address.city,
state=address.state or db_address.state,
zipcode=address.zipcode or db_address.zipcode,
country=address.country or db_address.country,
status="active", # Newly created address will be active
)
# Save the new address
db.add(new_address)
db.commit()
db.refresh(new_address)
return new_address
@address_router.delete("/{address_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_return_address(address_id: int, db: Session = Depends(get_db)):
db_address = db.query(AddressModel).filter(AddressModel.id == address_id).first()
if not db_address:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Address not found."
)
# Mark the address as obsolete instead of deleting it
db_address.status = "inactive"
db.commit()
return