Better integration of sqlite3 database
This commit is contained in:
@ -1,26 +1,32 @@
|
||||
from fastapi import APIRouter, HTTPException, status
|
||||
from typing import List, Optional
|
||||
from app.data.data import return_addresses
|
||||
from app.models import Address # Import the Address model
|
||||
|
||||
from fastapi import APIRouter, HTTPException, status, Depends
|
||||
from sqlalchemy.orm import Session
|
||||
from typing import List
|
||||
from app.schemas import Address as AddressSchema, AddressCreate
|
||||
from app.models import Address as AddressModel
|
||||
from app.dependencies import get_db
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@router.get("/", response_model=List[Address])
|
||||
async def get_return_addresses():
|
||||
return return_addresses
|
||||
@router.get("/", response_model=List[AddressSchema])
|
||||
async def get_return_addresses(db: Session = Depends(get_db)):
|
||||
return db.query(AddressModel).all()
|
||||
|
||||
@router.post("/", response_model=Address, status_code=status.HTTP_201_CREATED)
|
||||
async def create_return_address(address: Address):
|
||||
if any(a.city == address.city for a in return_addresses):
|
||||
@router.post("/", response_model=AddressSchema, status_code=status.HTTP_201_CREATED)
|
||||
async def create_return_address(address: AddressCreate, db: Session = Depends(get_db)):
|
||||
if db.query(AddressModel).filter(AddressModel.city == address.city).first():
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="Address in this city already exists."
|
||||
)
|
||||
if return_addresses:
|
||||
max_id = max(a.id for a in return_addresses)
|
||||
address.id = max_id + 1 if address.id is None else address.id
|
||||
else:
|
||||
address.id = 1 if address.id is None else address.id
|
||||
return_addresses.append(address)
|
||||
return address
|
||||
|
||||
db_address = AddressModel(
|
||||
street=address.street,
|
||||
city=address.city,
|
||||
zipcode=address.zipcode,
|
||||
country=address.country
|
||||
)
|
||||
|
||||
db.add(db_address)
|
||||
db.commit()
|
||||
db.refresh(db_address)
|
||||
return db_address
|
@ -1,26 +1,31 @@
|
||||
# app/routers/contact.py
|
||||
from fastapi import APIRouter, HTTPException, status
|
||||
from fastapi import APIRouter, HTTPException, status, Depends
|
||||
from sqlalchemy.orm import Session
|
||||
from typing import List
|
||||
from app.data.data import contacts
|
||||
from app.models import ContactPerson
|
||||
from app.schemas import ContactPerson, ContactPersonCreate
|
||||
from app.models import ContactPerson as ContactPersonModel
|
||||
from app.dependencies import get_db
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@router.get("/", response_model=List[ContactPerson])
|
||||
async def get_contacts():
|
||||
return contacts
|
||||
async def get_contacts(db: Session = Depends(get_db)):
|
||||
return db.query(ContactPersonModel).all()
|
||||
|
||||
@router.post("/", response_model=ContactPerson, status_code=status.HTTP_201_CREATED)
|
||||
async def create_contact(contact: ContactPerson):
|
||||
if any(c.email == contact.email for c in contacts):
|
||||
async def create_contact(contact: ContactPersonCreate, db: Session = Depends(get_db)):
|
||||
if db.query(ContactPersonModel).filter(ContactPersonModel.email == contact.email).first():
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="This contact already exists."
|
||||
)
|
||||
if contacts:
|
||||
max_id = max(c.id for c in contacts)
|
||||
contact.id = max_id + 1 if contact.id is None else contact.id
|
||||
else:
|
||||
contact.id = 1 if contact.id is None else contact.id
|
||||
contacts.append(contact)
|
||||
return contact
|
||||
|
||||
db_contact = ContactPersonModel(
|
||||
firstname=contact.firstname,
|
||||
lastname=contact.lastname,
|
||||
phone_number=contact.phone_number,
|
||||
email=contact.email
|
||||
)
|
||||
db.add(db_contact)
|
||||
db.commit()
|
||||
db.refresh(db_contact)
|
||||
return db_contact
|
@ -1,19 +1,38 @@
|
||||
from fastapi import APIRouter, HTTPException, status
|
||||
from typing import List, Optional
|
||||
from fastapi import APIRouter, HTTPException, status, Depends
|
||||
from sqlalchemy.orm import Session
|
||||
from typing import List
|
||||
import uuid
|
||||
from app.data.data import dewars, contacts, return_addresses
|
||||
from app.models import Dewar, ContactPerson, Address
|
||||
|
||||
from app.schemas import Dewar as DewarSchema, DewarCreate
|
||||
from app.models import Dewar as DewarModel
|
||||
from app.dependencies import get_db
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@router.get("/", response_model=List[Dewar])
|
||||
async def get_dewars():
|
||||
return dewars
|
||||
@router.get("/", response_model=List[DewarSchema])
|
||||
async def get_dewars(db: Session = Depends(get_db)):
|
||||
return db.query(DewarModel).all()
|
||||
|
||||
@router.post("/", response_model=Dewar, status_code=status.HTTP_201_CREATED)
|
||||
async def create_dewar(dewar: Dewar) -> Dewar:
|
||||
@router.post("/", response_model=DewarSchema, status_code=status.HTTP_201_CREATED)
|
||||
async def create_dewar(dewar: DewarCreate, db: Session = Depends(get_db)) -> DewarSchema:
|
||||
dewar_id = f'DEWAR-{uuid.uuid4().hex[:8].upper()}'
|
||||
dewar.id = dewar_id
|
||||
dewars.append(dewar)
|
||||
return dewar
|
||||
|
||||
db_dewar = DewarModel(
|
||||
id=dewar_id,
|
||||
dewar_name=dewar.dewar_name,
|
||||
tracking_number=dewar.tracking_number,
|
||||
number_of_pucks=dewar.number_of_pucks,
|
||||
number_of_samples=dewar.number_of_samples,
|
||||
status=dewar.status,
|
||||
ready_date=dewar.ready_date,
|
||||
shipping_date=dewar.shipping_date,
|
||||
arrival_date=dewar.arrival_date,
|
||||
returning_date=dewar.returning_date,
|
||||
qrcode=dewar.qrcode,
|
||||
contact_person_id=dewar.contact_person_id,
|
||||
return_address_id=dewar.return_address_id
|
||||
)
|
||||
|
||||
db.add(db_dewar)
|
||||
db.commit()
|
||||
db.refresh(db_dewar)
|
||||
return db_dewar
|
@ -1,11 +1,13 @@
|
||||
from fastapi import APIRouter
|
||||
from typing import List, Optional
|
||||
from app.data.data import proposals
|
||||
from app.models import Proposal # Import the Address model
|
||||
|
||||
# app/routers/proposal.py
|
||||
from fastapi import APIRouter, Depends
|
||||
from sqlalchemy.orm import Session
|
||||
from typing import List
|
||||
from app.schemas import Proposal as ProposalSchema
|
||||
from app.models import Proposal as ProposalModel
|
||||
from app.dependencies import get_db
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@router.get("/", response_model=List[Proposal])
|
||||
async def get_proposals():
|
||||
return proposals
|
||||
@router.get("/", response_model=List[ProposalSchema])
|
||||
async def get_proposals(db: Session = Depends(get_db)):
|
||||
return db.query(ProposalModel).all()
|
@ -1,85 +1,154 @@
|
||||
from fastapi import APIRouter, HTTPException, status, Query
|
||||
from fastapi import APIRouter, HTTPException, status, Query, Depends
|
||||
from sqlalchemy.orm import Session
|
||||
from typing import List, Optional
|
||||
import uuid
|
||||
from app.data.data import shipments, dewars
|
||||
from app.models import Shipment, Dewar, ContactPerson, Address, Proposal
|
||||
|
||||
from app.schemas import ShipmentCreate, Shipment as ShipmentSchema, ContactPerson as ContactPersonSchema
|
||||
from app.database import get_db
|
||||
from app.crud import get_shipments, get_shipment_by_id
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@router.get("/", response_model=List[Shipment])
|
||||
async def get_shipments(shipment_id: Optional[str] = Query(None, description="ID of the specific shipment to retrieve")):
|
||||
|
||||
@router.get("", response_model=List[ShipmentSchema])
|
||||
async def fetch_shipments(shipment_id: Optional[str] = Query(None),
|
||||
db: Session = Depends(get_db)):
|
||||
if shipment_id:
|
||||
shipment = next((sh for sh in shipments if sh.shipment_id == shipment_id), None)
|
||||
shipment = get_shipment_by_id(db, shipment_id)
|
||||
if not shipment:
|
||||
raise HTTPException(status_code=404, detail="Shipment not found")
|
||||
return [shipment]
|
||||
return shipments
|
||||
return get_shipments(db)
|
||||
|
||||
|
||||
@router.post("", response_model=ShipmentSchema, status_code=status.HTTP_201_CREATED)
|
||||
async def create_shipment(shipment: ShipmentCreate, db: Session = Depends(get_db)):
|
||||
from app.models import Shipment as ShipmentModel, ContactPerson as ContactPersonModel, Address as AddressModel, \
|
||||
Proposal as ProposalModel, Dewar as DewarModel
|
||||
|
||||
contact_person = db.query(ContactPersonModel).filter(ContactPersonModel.id == shipment.contact_person_id).first()
|
||||
return_address = db.query(AddressModel).filter(AddressModel.id == shipment.return_address_id).first()
|
||||
proposal = db.query(ProposalModel).filter(ProposalModel.id == shipment.proposal_id).first()
|
||||
|
||||
if not (contact_person and return_address and proposal):
|
||||
raise HTTPException(status_code=404, detail="Associated entity not found")
|
||||
|
||||
shipment_id = f'SHIP-{uuid.uuid4().hex[:8].upper()}'
|
||||
db_shipment = ShipmentModel(
|
||||
shipment_id=shipment_id,
|
||||
shipment_name=shipment.shipment_name,
|
||||
shipment_date=shipment.shipment_date,
|
||||
shipment_status=shipment.shipment_status,
|
||||
comments=shipment.comments,
|
||||
contact_person_id=contact_person.id,
|
||||
return_address_id=return_address.id,
|
||||
proposal_id=proposal.id,
|
||||
)
|
||||
|
||||
# Handling dewars association
|
||||
if shipment.dewars:
|
||||
dewars = db.query(DewarModel).filter(DewarModel.id.in_(shipment.dewars)).all()
|
||||
if len(dewars) != len(shipment.dewars):
|
||||
raise HTTPException(status_code=404, detail="One or more dewars not found")
|
||||
db_shipment.dewars.extend(dewars)
|
||||
|
||||
db.add(db_shipment)
|
||||
db.commit()
|
||||
db.refresh(db_shipment)
|
||||
|
||||
return db_shipment
|
||||
|
||||
|
||||
@router.delete("/{shipment_id}", status_code=status.HTTP_204_NO_CONTENT)
|
||||
async def delete_shipment(shipment_id: str):
|
||||
global shipments
|
||||
shipments = [shipment for shipment in shipments if shipment.shipment_id != shipment_id]
|
||||
|
||||
@router.post("/{shipment_id}/add_dewar", response_model=Shipment)
|
||||
async def add_dewar_to_shipment(shipment_id: str, dewar_id: str):
|
||||
shipment = next((sh for sh in shipments if sh.shipment_id == shipment_id), None)
|
||||
async def delete_shipment(shipment_id: str, db: Session = Depends(get_db)):
|
||||
from app.models import Shipment as ShipmentModel
|
||||
shipment = db.query(ShipmentModel).filter(ShipmentModel.shipment_id == shipment_id).first()
|
||||
if not shipment:
|
||||
raise HTTPException(status_code=404, detail="Shipment not found")
|
||||
dewar = next((dw for dw in dewars if dw.id == dewar_id), None)
|
||||
db.delete(shipment)
|
||||
db.commit()
|
||||
return
|
||||
|
||||
|
||||
@router.put("/shipments/{shipment_id}", response_model=ShipmentSchema)
|
||||
async def update_shipment(shipment_id: str, updated_shipment: ShipmentCreate, db: Session = Depends(get_db)):
|
||||
from app.models import Shipment as ShipmentModel, ContactPerson as ContactPersonModel, Address as AddressModel, \
|
||||
Dewar as DewarModel
|
||||
|
||||
# Log incoming payload for detailed inspection
|
||||
print("Received payload:", json.dumps(updated_shipment.dict(), indent=2))
|
||||
|
||||
try:
|
||||
shipment = db.query(ShipmentModel).filter(ShipmentModel.shipment_id == shipment_id).first()
|
||||
if not shipment:
|
||||
raise HTTPException(status_code=404, detail="Shipment not found")
|
||||
|
||||
# Validate relationships by IDs
|
||||
contact_person = db.query(ContactPersonModel).filter(
|
||||
ContactPersonModel.id == updated_shipment.contact_person_id).first()
|
||||
return_address = db.query(AddressModel).filter(AddressModel.id == updated_shipment.return_address_id).first()
|
||||
if not contact_person:
|
||||
raise HTTPException(status_code=404, detail="Contact person not found")
|
||||
if not return_address:
|
||||
raise HTTPException(status_code=404, detail="Return address not found")
|
||||
|
||||
# Handling dewars association by IDs
|
||||
dewars_ids = [d['id'] for d in updated_shipment.dewars]
|
||||
dewars = db.query(DewarModel).filter(DewarModel.id.in_(dewars_ids)).all()
|
||||
if len(dewars) != len(dewars_ids):
|
||||
raise HTTPException(status_code=422, detail="One or more dewars not found")
|
||||
|
||||
# Update shipment details
|
||||
shipment.shipment_name = updated_shipment.shipment_name
|
||||
shipment.shipment_date = updated_shipment.shipment_date
|
||||
shipment.shipment_status = updated_shipment.shipment_status
|
||||
shipment.comments = updated_shipment.comments
|
||||
shipment.contact_person_id = updated_shipment.contact_person_id
|
||||
shipment.return_address_id = updated_shipment.return_address_id
|
||||
shipment.dewars = dewars
|
||||
|
||||
db.commit()
|
||||
db.refresh(shipment)
|
||||
|
||||
return shipment
|
||||
|
||||
except Exception as e:
|
||||
print(f"Update failed with exception: {e}")
|
||||
raise HTTPException(status_code=500, detail="Internal server error")
|
||||
|
||||
|
||||
@router.post("/{shipment_id}/add_dewar", response_model=ShipmentSchema)
|
||||
async def add_dewar_to_shipment(shipment_id: str, dewar_id: str, db: Session = Depends(get_db)):
|
||||
from app.models import Shipment as ShipmentModel, Dewar as DewarModel
|
||||
shipment = db.query(ShipmentModel).filter(ShipmentModel.shipment_id == shipment_id).first()
|
||||
if not shipment:
|
||||
raise HTTPException(status_code=404, detail="Shipment not found")
|
||||
dewar = db.query(DewarModel).filter(DewarModel.id == dewar_id).first()
|
||||
if not dewar:
|
||||
raise HTTPException(status_code=404, detail="Dewar not found")
|
||||
|
||||
if dewar not in shipment.dewars:
|
||||
shipment.dewars.append(dewar)
|
||||
db.commit()
|
||||
db.refresh(shipment)
|
||||
return shipment
|
||||
|
||||
@router.put("/{shipment_id}", response_model=Shipment)
|
||||
async def update_shipment(shipment_id: str, updated_shipment: Shipment):
|
||||
global shipments
|
||||
shipment = next((sh for sh in shipments if sh.shipment_id == shipment_id), None)
|
||||
|
||||
@router.delete("/{shipment_id}/remove_dewar/{dewar_id}", response_model=ShipmentSchema)
|
||||
async def remove_dewar_from_shipment(shipment_id: str, dewar_id: str, db: Session = Depends(get_db)):
|
||||
from app.models import Shipment as ShipmentModel, Dewar as DewarModel
|
||||
shipment = db.query(ShipmentModel).filter(ShipmentModel.shipment_id == shipment_id).first()
|
||||
if not shipment:
|
||||
raise HTTPException(status_code=404, detail="Shipment not found")
|
||||
shipment.shipment_name = updated_shipment.shipment_name
|
||||
shipment.shipment_date = updated_shipment.shipment_date
|
||||
shipment.shipment_status = updated_shipment.shipment_status
|
||||
shipment.contact_person = updated_shipment.contact_person
|
||||
shipment.proposal_number = updated_shipment.proposal_number
|
||||
shipment.return_address = updated_shipment.return_address
|
||||
shipment.comments = updated_shipment.comments
|
||||
|
||||
existing_dewar_dict = {dewar.id: dewar for dewar in shipment.dewars}
|
||||
for updated_dewar in updated_shipment.dewars:
|
||||
if updated_dewar.id in existing_dewar_dict:
|
||||
existing_dewar_dict[updated_dewar.id].dewar_name = updated_dewar.dewar_name
|
||||
existing_dewar_dict[updated_dewar.id].tracking_number = updated_dewar.tracking_number
|
||||
existing_dewar_dict[updated_dewar.id].number_of_pucks = updated_dewar.number_of_pucks
|
||||
existing_dewar_dict[updated_dewar.id].number_of_samples = updated_dewar.number_of_samples
|
||||
existing_dewar_dict[updated_dewar.id].return_address = updated_dewar.return_address
|
||||
existing_dewar_dict[updated_dewar.id].contact_person = updated_dewar.contact_person
|
||||
existing_dewar_dict[updated_dewar.id].status = updated_dewar.status
|
||||
existing_dewar_dict[updated_dewar.id].ready_date = updated_dewar.ready_date
|
||||
existing_dewar_dict[updated_dewar.id].shipping_date = updated_dewar.shipping_date
|
||||
existing_dewar_dict[updated_dewar.id].arrival_date = updated_dewar.arrival_date
|
||||
existing_dewar_dict[updated_dewar.id].returning_date = updated_dewar.returning_date
|
||||
existing_dewar_dict[updated_dewar.id].qrcode = updated_dewar.qrcode
|
||||
else:
|
||||
shipment.dewars.append(updated_dewar)
|
||||
return shipment
|
||||
|
||||
@router.post("/", response_model=Shipment, status_code=status.HTTP_201_CREATED)
|
||||
async def create_shipment(shipment: Shipment):
|
||||
shipment_id = f'SHIP-{uuid.uuid4().hex[:8].upper()}'
|
||||
shipment.shipment_id = shipment_id
|
||||
shipments.append(shipment)
|
||||
return shipment
|
||||
|
||||
@router.delete("/{shipment_id}/remove_dewar/{dewar_id}", response_model=Shipment)
|
||||
async def remove_dewar_from_shipment(shipment_id: str, dewar_id: str):
|
||||
shipment = next((sh for sh in shipments if sh.shipment_id == shipment_id), None)
|
||||
if not shipment:
|
||||
raise HTTPException(status_code=404, detail="Shipment not found")
|
||||
shipment.dewars = [dw for dw in shipment.dewars if dw.id != dewar_id]
|
||||
db.commit()
|
||||
db.refresh(shipment)
|
||||
return shipment
|
||||
|
||||
@router.get("/contact_persons", response_model=List[ContactPerson])
|
||||
async def get_shipment_contact_persons():
|
||||
return [{"shipment_id": shipment.shipment_id, "contact_person": shipment.get_shipment_contact_persons()} for shipment in shipments]
|
||||
|
||||
@router.get("/contact_persons", response_model=List[ContactPersonSchema])
|
||||
async def get_shipment_contact_persons(db: Session = Depends(get_db)):
|
||||
from app.models import ContactPerson as ContactPersonModel
|
||||
contact_persons = db.query(ContactPersonModel).all()
|
||||
return contact_persons
|
Reference in New Issue
Block a user