
Replaced usage of "ContactPerson" with "Contact" for consistency across the codebase. Updated related component props, state variables, API calls, and database queries to align with the new model. Also enhanced backend functionality with stricter validations and added support for handling active pgroups in contact management.
163 lines
5.3 KiB
Python
163 lines
5.3 KiB
Python
from fastapi import APIRouter, HTTPException, status, Depends, Query
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import or_
|
|
from typing import List
|
|
|
|
from app.schemas import Contact, ContactCreate, ContactUpdate, loginData
|
|
from app.models import Contact as ContactModel
|
|
from app.dependencies import get_db
|
|
from app.routers.auth import get_current_user
|
|
|
|
contact_router = APIRouter()
|
|
|
|
|
|
# GET /contacts: Retrieve active contacts from the active_pgroup
|
|
@contact_router.get("/", response_model=List[Contact])
|
|
async def get_contacts(
|
|
active_pgroup: str = Query(...),
|
|
db: Session = Depends(get_db),
|
|
current_user: loginData = Depends(get_current_user),
|
|
):
|
|
# Validate that the active_pgroup belongs to the user
|
|
if active_pgroup not in current_user.pgroups:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invalid pgroup provided.",
|
|
)
|
|
|
|
# Query for active contacts in the active_pgroup
|
|
contacts = (
|
|
db.query(ContactModel)
|
|
.filter(
|
|
ContactModel.pgroups.like(f"%{active_pgroup}%"),
|
|
ContactModel.status == "active",
|
|
)
|
|
.all()
|
|
)
|
|
return contacts
|
|
|
|
|
|
# GET /contacts/all: Retrieve all contacts from the user's pgroups
|
|
@contact_router.get("/all", response_model=List[Contact])
|
|
async def get_all_contacts(
|
|
db: Session = Depends(get_db),
|
|
current_user: loginData = Depends(get_current_user),
|
|
):
|
|
# Query for all contacts belonging to any of the user's pgroups
|
|
user_pgroups = current_user.pgroups
|
|
filters = [ContactModel.pgroups.like(f"%{pgroup}%") for pgroup in user_pgroups]
|
|
contacts = db.query(ContactModel).filter(or_(*filters)).all()
|
|
return contacts
|
|
|
|
|
|
@contact_router.post("/", response_model=Contact, status_code=status.HTTP_201_CREATED)
|
|
async def create_contact(
|
|
contact: ContactCreate, # Body parameter ONLY
|
|
db: Session = Depends(get_db), # Secondary dependency for database access
|
|
):
|
|
# Check if a contact with the same email already exists in this pgroup
|
|
if (
|
|
db.query(ContactModel)
|
|
.filter(
|
|
ContactModel.email == contact.email,
|
|
ContactModel.pgroups.like(f"%{contact.pgroups}%"),
|
|
ContactModel.status == "active",
|
|
)
|
|
.first()
|
|
):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="This contact already exists in the provided pgroup.",
|
|
)
|
|
|
|
# Create a new contact
|
|
db_contact = ContactModel(
|
|
firstname=contact.firstname,
|
|
lastname=contact.lastname,
|
|
phone_number=contact.phone_number,
|
|
email=contact.email,
|
|
pgroups=contact.pgroups, # Use the pgroups from the body
|
|
status="active", # Newly created contacts will be active
|
|
)
|
|
db.add(db_contact)
|
|
db.commit()
|
|
db.refresh(db_contact)
|
|
return db_contact
|
|
|
|
|
|
# PUT /contacts/{contact_id}: Update a contact
|
|
@contact_router.put("/{contact_id}", response_model=Contact)
|
|
async def update_contact(
|
|
contact_id: int,
|
|
contact: ContactUpdate,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
# Retrieve the existing contact
|
|
db_contact = db.query(ContactModel).filter(ContactModel.id == contact_id).first()
|
|
if not db_contact:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Contact not found.",
|
|
)
|
|
# Normalize existing and new pgroups (remove whitespace, handle case
|
|
# sensitivity if needed)
|
|
existing_pgroups = (
|
|
set(p.strip() for p in db_contact.pgroups.split(",") if p.strip())
|
|
if db_contact.pgroups
|
|
else set()
|
|
)
|
|
new_pgroups = (
|
|
set(p.strip() for p in contact.pgroups.split(",") if p.strip())
|
|
if contact.pgroups
|
|
else set()
|
|
)
|
|
|
|
# Check if any old pgroups are being removed (strict validation against removal)
|
|
if not new_pgroups.issuperset(existing_pgroups):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Modifying pgroups to remove existing ones is not allowed.",
|
|
)
|
|
|
|
combined_pgroups = existing_pgroups.union(new_pgroups)
|
|
|
|
# Mark the old contact as inactive
|
|
db_contact.status = "inactive"
|
|
db.commit()
|
|
db.refresh(db_contact)
|
|
|
|
# Create a new contact with the updated data
|
|
new_contact = ContactModel(
|
|
firstname=contact.firstname or db_contact.firstname,
|
|
lastname=contact.lastname or db_contact.lastname,
|
|
phone_number=contact.phone_number or db_contact.phone_number,
|
|
email=contact.email or db_contact.email,
|
|
pgroups=",".join(combined_pgroups), # Use the active_pgroup
|
|
status="active", # Newly created contacts will be active
|
|
)
|
|
db.add(new_contact)
|
|
db.commit()
|
|
db.refresh(new_contact)
|
|
|
|
return new_contact
|
|
|
|
|
|
# DELETE /contacts/{contact_id}: Mark a contact as inactive
|
|
@contact_router.delete("/{contact_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def delete_contact(
|
|
contact_id: int,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
# Retrieve the existing contact
|
|
db_contact = db.query(ContactModel).filter(ContactModel.id == contact_id).first()
|
|
if not db_contact:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Contact not found.",
|
|
)
|
|
|
|
# Mark the contact as inactive
|
|
db_contact.status = "inactive"
|
|
db.commit()
|
|
return
|