
Updated the `number_of_pucks` and `number_of_samples` fields in the `schemas.py` to be optional for greater flexibility. Simplified the test Jupyter Notebook by restructuring imports and consolidating function calls for better readability and maintainability.
668 lines
22 KiB
Python
668 lines
22 KiB
Python
import os
|
|
import tempfile
|
|
import time
|
|
import random
|
|
import hashlib
|
|
from fastapi import APIRouter, HTTPException, status, Depends, Response
|
|
from sqlalchemy.orm import Session, joinedload
|
|
from typing import List
|
|
import logging
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from app.schemas import (
|
|
Dewar as DewarSchema,
|
|
DewarCreate,
|
|
DewarUpdate,
|
|
DewarType as DewarTypeSchema,
|
|
DewarTypeCreate,
|
|
DewarSerialNumber as DewarSerialNumberSchema,
|
|
DewarSerialNumberCreate,
|
|
Shipment as ShipmentSchema,
|
|
Dewar,
|
|
SampleUpdate,
|
|
Sample,
|
|
Puck,
|
|
SampleEventResponse, # Clearer name for schema
|
|
)
|
|
from app.models import (
|
|
Dewar as DewarModel,
|
|
Puck as PuckModel,
|
|
Sample as SampleModel,
|
|
DewarType as DewarTypeModel,
|
|
DewarSerialNumber as DewarSerialNumberModel,
|
|
LogisticsEvent,
|
|
PuckEvent,
|
|
SampleEvent,
|
|
)
|
|
from app.dependencies import get_db
|
|
import qrcode
|
|
import io
|
|
from io import BytesIO
|
|
from PIL import Image
|
|
from reportlab.lib.pagesizes import A5, landscape
|
|
from reportlab.lib.units import cm
|
|
from reportlab.pdfgen import canvas
|
|
from app.crud import (
|
|
get_shipments,
|
|
get_shipment_by_id,
|
|
) # Import CRUD functions for shipment
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def generate_unique_id(db: Session, length: int = 16) -> str:
|
|
while True:
|
|
base_string = f"{time.time()}{random.randint(0, 10 ** 6)}"
|
|
hash_object = hashlib.sha256(base_string.encode())
|
|
hash_digest = hash_object.hexdigest()
|
|
unique_id = "".join(random.choices(hash_digest, k=length))
|
|
existing_dewar = (
|
|
db.query(DewarModel).filter(DewarModel.unique_id == unique_id).first()
|
|
)
|
|
if not existing_dewar:
|
|
break
|
|
return unique_id
|
|
|
|
|
|
@router.post("/", response_model=DewarSchema, status_code=status.HTTP_201_CREATED)
|
|
async def create_or_update_dewar(
|
|
shipment_id: int,
|
|
dewar: DewarCreate,
|
|
db: Session = Depends(get_db),
|
|
) -> DewarSchema:
|
|
try:
|
|
# Query existing dewar by name within the shipment
|
|
existing_dewar = (
|
|
db.query(DewarModel)
|
|
.filter(
|
|
DewarModel.dewar_name == dewar.dewar_name,
|
|
DewarModel.shipment_id == shipment_id,
|
|
)
|
|
.first()
|
|
)
|
|
|
|
if existing_dewar:
|
|
logging.debug(
|
|
f"Updating existing dewar with name "
|
|
f"{dewar.dewar_name} in shipment {shipment_id}"
|
|
)
|
|
|
|
# Check for associated puck events
|
|
for puck in existing_dewar.pucks:
|
|
puck_event_exists = (
|
|
db.query(PuckEvent).filter(PuckEvent.puck_id == puck.id).first()
|
|
)
|
|
if puck_event_exists:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Puck {puck.id} "
|
|
f"associated with Dewar {existing_dewar.id}"
|
|
f" has events. "
|
|
f"Update not allowed.",
|
|
)
|
|
|
|
# Check for associated sample events within each puck
|
|
for sample in puck.samples:
|
|
sample_event_exists = (
|
|
db.query(SampleEvent)
|
|
.filter(SampleEvent.sample_id == sample.id)
|
|
.first()
|
|
)
|
|
if sample_event_exists:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Sample {sample.id} "
|
|
f"associated with Puck "
|
|
f"{puck.id} in Dewar "
|
|
f"{existing_dewar.id} "
|
|
f"has events. Update not allowed.",
|
|
)
|
|
|
|
# Delete associated pucks and samples if no events are found
|
|
for puck in existing_dewar.pucks:
|
|
for sample in puck.samples:
|
|
db.delete(sample)
|
|
db.delete(puck)
|
|
db.commit()
|
|
|
|
# Update dewar metadata
|
|
for key, value in dewar.dict(
|
|
exclude={"pucks", "number_of_pucks", "number_of_samples"}
|
|
).items():
|
|
if (
|
|
hasattr(existing_dewar, key)
|
|
and hasattr(type(existing_dewar), key)
|
|
and isinstance(getattr(type(existing_dewar), key), property)
|
|
and getattr(type(existing_dewar), key).fset
|
|
):
|
|
setattr(existing_dewar, key, value)
|
|
|
|
# Commit updates to existing dewar
|
|
db.commit()
|
|
|
|
# Add new pucks and samples
|
|
for puck_data in dewar.pucks:
|
|
puck = PuckModel(
|
|
dewar_id=existing_dewar.id,
|
|
puck_name=puck_data.puck_name,
|
|
puck_type=puck_data.puck_type,
|
|
puck_location_in_dewar=puck_data.puck_location_in_dewar,
|
|
)
|
|
db.add(puck)
|
|
db.commit()
|
|
db.refresh(puck)
|
|
|
|
for sample_data in puck_data.samples:
|
|
sample = SampleModel(
|
|
puck_id=puck.id,
|
|
sample_name=sample_data.sample_name,
|
|
proteinname=sample_data.proteinname,
|
|
position=sample_data.position,
|
|
priority=sample_data.priority,
|
|
comments=sample_data.comments,
|
|
data_collection_parameters=dict(
|
|
sample_data.data_collection_parameters or {}
|
|
),
|
|
)
|
|
db.add(sample)
|
|
db.commit()
|
|
db.refresh(sample)
|
|
|
|
db.refresh(existing_dewar)
|
|
return existing_dewar
|
|
|
|
# Create a completely new dewar if none exists
|
|
dewar_obj = DewarModel(
|
|
dewar_name=dewar.dewar_name,
|
|
tracking_number=dewar.tracking_number,
|
|
status=dewar.status,
|
|
ready_date=dewar.ready_date,
|
|
shipping_date=dewar.shipping_date,
|
|
arrival_date=dewar.arrival_date,
|
|
returning_date=dewar.returning_date,
|
|
contact_person_id=dewar.contact_person_id,
|
|
return_address_id=dewar.return_address_id,
|
|
shipment_id=shipment_id, # Associate with the shipment
|
|
)
|
|
db.add(dewar_obj)
|
|
db.commit()
|
|
db.refresh(dewar_obj)
|
|
|
|
# Add pucks and samples for the new dewar
|
|
for puck_data in dewar.pucks:
|
|
puck = PuckModel(
|
|
dewar_id=dewar_obj.id,
|
|
puck_name=puck_data.puck_name,
|
|
puck_type=puck_data.puck_type,
|
|
puck_location_in_dewar=puck_data.puck_location_in_dewar,
|
|
)
|
|
db.add(puck)
|
|
db.commit()
|
|
db.refresh(puck)
|
|
|
|
for sample_data in puck_data.samples:
|
|
sample = SampleModel(
|
|
puck_id=puck.id,
|
|
sample_name=sample_data.sample_name,
|
|
proteinname=sample_data.proteinname,
|
|
position=sample_data.position,
|
|
priority=sample_data.priority,
|
|
comments=sample_data.comments,
|
|
data_collection_parameters=dict(
|
|
sample_data.data_collection_parameters or {}
|
|
),
|
|
)
|
|
db.add(sample)
|
|
db.commit()
|
|
db.refresh(sample)
|
|
|
|
return dewar_obj
|
|
|
|
except SQLAlchemyError as e:
|
|
logging.error(f"Database error occurred: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
|
|
@router.post("/{dewar_id}/generate-qrcode")
|
|
async def generate_dewar_qrcode(dewar_id: int, db: Session = Depends(get_db)):
|
|
dewar = db.query(DewarModel).filter(DewarModel.id == dewar_id).first()
|
|
if not dewar:
|
|
raise HTTPException(status_code=404, detail="Dewar not found")
|
|
|
|
if not dewar.unique_id:
|
|
dewar.unique_id = generate_unique_id(db)
|
|
db.commit()
|
|
|
|
qr = qrcode.QRCode(version=1, box_size=10, border=5)
|
|
qr.add_data(dewar.unique_id)
|
|
qr.make(fit=True)
|
|
img = qr.make_image(fill="black", back_color="white")
|
|
|
|
buf = io.BytesIO()
|
|
img.save(buf)
|
|
buf.seek(0)
|
|
dewar.qrcode = dewar.unique_id
|
|
dewar.qrcode_image = buf.getvalue()
|
|
db.commit()
|
|
|
|
return {"message": "QR Code generated", "qrcode": dewar.unique_id}
|
|
|
|
|
|
def generate_label(dewar):
|
|
buffer = BytesIO()
|
|
# Set page orientation to landscape
|
|
c = canvas.Canvas(buffer, pagesize=landscape(A5))
|
|
|
|
# Dimensions for the A5 landscape
|
|
page_width, page_height = landscape(A5)
|
|
|
|
# Path to the PNG logo
|
|
file_dir = os.path.dirname(os.path.abspath(__file__))
|
|
png_logo_path = os.path.join(file_dir, "Heidi-logo.png")
|
|
|
|
# Open the logo with PIL to get its size
|
|
logo_image = Image.open(png_logo_path)
|
|
logo_aspect_ratio = logo_image.width / logo_image.height # original aspect ratio
|
|
|
|
# Desired logo width in the PDF (you can adjust this size)
|
|
desired_logo_width = 4 * cm
|
|
desired_logo_height = (
|
|
desired_logo_width / logo_aspect_ratio
|
|
) # maintain aspect ratio
|
|
|
|
# Draw header text
|
|
c.setFont("Helvetica-Bold", 16)
|
|
c.drawString(2 * cm, page_height - 2 * cm, "Paul Scherrer Institut")
|
|
|
|
# Draw the Heidi logo with preserved aspect ratio
|
|
c.drawImage(
|
|
png_logo_path,
|
|
page_width - desired_logo_width - 2 * cm,
|
|
page_height - desired_logo_height - 2 * cm,
|
|
width=desired_logo_width,
|
|
height=desired_logo_height,
|
|
mask="auto",
|
|
)
|
|
|
|
# Draw details section
|
|
c.setFont("Helvetica", 12)
|
|
|
|
y_position = (
|
|
page_height - 4 * cm
|
|
) # Adjusted to ensure text doesn't overlap with the logo
|
|
line_height = 0.8 * cm
|
|
|
|
if dewar.shipment:
|
|
c.drawString(
|
|
2 * cm, y_position, f"Shipment Name: {dewar.shipment.shipment_name}"
|
|
)
|
|
y_position -= line_height
|
|
|
|
c.drawString(2 * cm, y_position, f"Dewar Name: {dewar.dewar_name}")
|
|
y_position -= line_height
|
|
|
|
c.drawString(2 * cm, y_position, f"Unique ID: {dewar.unique_id}")
|
|
y_position -= line_height
|
|
|
|
if dewar.contact_person:
|
|
contact_person = dewar.contact_person
|
|
c.drawString(
|
|
2 * cm,
|
|
y_position,
|
|
f"Contact: {contact_person.firstname} {contact_person.lastname}",
|
|
)
|
|
y_position -= line_height
|
|
c.drawString(2 * cm, y_position, f"Email: {contact_person.email}")
|
|
y_position -= line_height
|
|
c.drawString(2 * cm, y_position, f"Phone: {contact_person.phone_number}")
|
|
y_position -= line_height
|
|
|
|
if dewar.return_address:
|
|
return_address = dewar.return_address
|
|
c.drawString(2 * cm, y_position, f"Return Address: {return_address.street}")
|
|
y_position -= line_height
|
|
c.drawString(2 * cm, y_position, f"City: {return_address.city}")
|
|
y_position -= line_height
|
|
c.drawString(2 * cm, y_position, f"Postal Code: {return_address.zipcode}")
|
|
y_position -= line_height
|
|
c.drawString(2 * cm, y_position, f"Country: {return_address.country}")
|
|
y_position -= line_height
|
|
|
|
c.drawString(2 * cm, y_position, "Beamtime Information: Placeholder")
|
|
|
|
# Generate QR code
|
|
qr = qrcode.QRCode(version=1, box_size=10, border=4)
|
|
qr.add_data(dewar.unique_id)
|
|
qr.make(fit=True)
|
|
qr_img = qr.make_image(fill="black", back_color="white").convert("RGBA")
|
|
|
|
# Save this QR code to a temporary file
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file:
|
|
qr_img.save(temp_file, format="PNG")
|
|
temp_file_path = temp_file.name
|
|
|
|
# Add QR code to PDF
|
|
c.drawImage(
|
|
temp_file_path, page_width - 6 * cm, 5 * cm, width=4 * cm, height=4 * cm
|
|
)
|
|
|
|
# Add footer text
|
|
c.setFont("Helvetica", 10)
|
|
c.drawCentredString(page_width / 2, 2 * cm, "Scan for more information")
|
|
|
|
# Draw border
|
|
c.setLineWidth(1)
|
|
c.rect(
|
|
1 * cm, 1 * cm, page_width - 2 * cm, page_height - 2 * cm
|
|
) # Adjusted dimensions
|
|
|
|
# Finalize the canvas
|
|
c.showPage()
|
|
c.save()
|
|
|
|
buffer.seek(0)
|
|
|
|
# Cleanup temporary file
|
|
os.remove(temp_file_path)
|
|
|
|
return buffer
|
|
|
|
|
|
@router.get("/{dewar_id}/download-label", response_class=Response)
|
|
async def download_dewar_label(dewar_id: int, db: Session = Depends(get_db)):
|
|
dewar = (
|
|
db.query(DewarModel)
|
|
.options(
|
|
joinedload(DewarModel.pucks).joinedload(PuckModel.samples),
|
|
joinedload(DewarModel.contact_person),
|
|
joinedload(DewarModel.return_address),
|
|
joinedload(DewarModel.shipment),
|
|
)
|
|
.filter(DewarModel.id == dewar_id)
|
|
.first()
|
|
)
|
|
|
|
if not dewar:
|
|
raise HTTPException(status_code=404, detail="Dewar not found")
|
|
if not dewar.unique_id:
|
|
raise HTTPException(
|
|
status_code=404, detail="QR Code not generated for this dewar"
|
|
)
|
|
|
|
buffer = generate_label(dewar)
|
|
|
|
return Response(
|
|
buffer.getvalue(),
|
|
media_type="application/pdf",
|
|
headers={
|
|
"Content-Disposition": f"attachment; filename=dewar_label_{dewar.id}.pdf"
|
|
},
|
|
)
|
|
|
|
|
|
@router.put("/samples/{sample_id}", response_model=Sample)
|
|
async def update_sample(
|
|
sample_id: int,
|
|
sample_update: SampleUpdate,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
# Log the payload received from the frontend
|
|
logging.info(
|
|
f"Payload received for sample ID {sample_id}: "
|
|
f"{sample_update.dict(exclude_unset=True)}"
|
|
)
|
|
|
|
# Query the sample from the database
|
|
sample = db.query(SampleModel).filter(SampleModel.id == sample_id).first()
|
|
|
|
if not sample:
|
|
logging.error(f"Sample with ID {sample_id} not found")
|
|
raise HTTPException(status_code=404, detail="Sample not found")
|
|
|
|
# Apply updates
|
|
for key, value in sample_update.dict(exclude_unset=True).items():
|
|
if hasattr(sample, key):
|
|
setattr(sample, key, value)
|
|
|
|
# Commit changes to the database
|
|
db.commit()
|
|
db.refresh(sample) # Reload the updated sample object
|
|
|
|
# Log the updated sample before returning it
|
|
logging.info(f"Updated sample with ID {sample_id}: {Sample.from_orm(sample)}")
|
|
|
|
return Sample.from_orm(sample)
|
|
|
|
|
|
@router.get("/dewars/{dewar_id}/samples", response_model=Dewar)
|
|
async def get_dewar_samples(dewar_id: int, db: Session = Depends(get_db)):
|
|
# Fetch the Dewar with nested relationships
|
|
dewar = db.query(DewarModel).filter(DewarModel.id == dewar_id).first()
|
|
if not dewar:
|
|
raise HTTPException(status_code=404, detail="Dewar not found")
|
|
|
|
# Explicitly map nested relationships
|
|
dewar_data = {
|
|
"id": dewar.id,
|
|
"dewar_name": dewar.dewar_name,
|
|
"tracking_number": dewar.tracking_number,
|
|
"status": dewar.status,
|
|
"number_of_pucks": len(dewar.pucks), # Calculate number of pucks
|
|
"number_of_samples": sum(
|
|
len(puck.samples) for puck in dewar.pucks
|
|
), # Calculate total samples
|
|
"ready_date": dewar.ready_date,
|
|
"shipping_date": dewar.shipping_date,
|
|
"arrival_date": dewar.arrival_date,
|
|
"returning_date": dewar.returning_date,
|
|
"contact_person_id": dewar.contact_person.id if dewar.contact_person else None,
|
|
"return_address_id": dewar.return_address.id if dewar.return_address else None,
|
|
"shipment_id": dewar.shipment_id,
|
|
"contact_person": dewar.contact_person,
|
|
"return_address": dewar.return_address,
|
|
"pucks": [
|
|
Puck(
|
|
id=puck.id,
|
|
puck_name=puck.puck_name,
|
|
puck_type=puck.puck_type,
|
|
puck_location_in_dewar=puck.puck_location_in_dewar,
|
|
dewar_id=dewar.id,
|
|
samples=[
|
|
Sample(
|
|
id=sample.id,
|
|
sample_name=sample.sample_name,
|
|
position=sample.position,
|
|
puck_id=sample.puck_id,
|
|
crystalname=getattr(sample, "crystalname", None),
|
|
proteinname=getattr(sample, "proteinname", None),
|
|
positioninpuck=getattr(sample, "positioninpuck", None),
|
|
priority=sample.priority,
|
|
comments=sample.comments,
|
|
data_collection_parameters=sample.data_collection_parameters,
|
|
events=[
|
|
SampleEventResponse.from_orm(event)
|
|
for event in sample.events
|
|
],
|
|
)
|
|
for sample in puck.samples
|
|
],
|
|
)
|
|
for puck in dewar.pucks
|
|
],
|
|
}
|
|
|
|
# Return the Pydantic Dewar model
|
|
return Dewar(**dewar_data)
|
|
|
|
|
|
@router.get("/", response_model=List[DewarSchema])
|
|
async def get_dewars(db: Session = Depends(get_db)):
|
|
try:
|
|
dewars = db.query(DewarModel).options(joinedload(DewarModel.pucks)).all()
|
|
return dewars
|
|
except SQLAlchemyError as e:
|
|
logging.error(f"Database error occurred: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
|
|
@router.get("/dewar-types", response_model=List[DewarTypeSchema])
|
|
def get_dewar_types(db: Session = Depends(get_db)):
|
|
return db.query(DewarTypeModel).all()
|
|
|
|
|
|
@router.get(
|
|
"/dewar-types/{type_id}/serial-numbers",
|
|
response_model=List[DewarSerialNumberSchema],
|
|
)
|
|
def get_serial_numbers(type_id: int, db: Session = Depends(get_db)):
|
|
return (
|
|
db.query(DewarSerialNumberModel)
|
|
.filter(DewarSerialNumberModel.dewar_type_id == type_id)
|
|
.all()
|
|
)
|
|
|
|
|
|
@router.post("/dewar-types", response_model=DewarTypeSchema)
|
|
def create_dewar_type(dewar_type: DewarTypeCreate, db: Session = Depends(get_db)):
|
|
db_type = DewarTypeModel(**dewar_type.dict())
|
|
db.add(db_type)
|
|
db.commit()
|
|
db.refresh(db_type)
|
|
return db_type
|
|
|
|
|
|
@router.post("/dewar-serial-numbers", response_model=DewarSerialNumberSchema)
|
|
def create_dewar_serial_number(
|
|
serial_number: DewarSerialNumberCreate, db: Session = Depends(get_db)
|
|
):
|
|
db_serial = DewarSerialNumberModel(**serial_number.dict())
|
|
db.add(db_serial)
|
|
db.commit()
|
|
db.refresh(db_serial)
|
|
return db_serial
|
|
|
|
|
|
@router.get("/dewar-serial-numbers", response_model=List[DewarSerialNumberSchema])
|
|
def get_all_serial_numbers(db: Session = Depends(get_db)):
|
|
try:
|
|
serial_numbers = db.query(DewarSerialNumberModel).all()
|
|
return serial_numbers
|
|
except SQLAlchemyError as e:
|
|
logging.error(f"Database error occurred: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
|
|
@router.get("/{dewar_id}", response_model=DewarSchema)
|
|
async def get_dewar(dewar_id: int, db: Session = Depends(get_db)):
|
|
dewar = (
|
|
db.query(DewarModel)
|
|
.options(
|
|
joinedload(DewarModel.pucks).joinedload(PuckModel.samples),
|
|
joinedload(DewarModel.contact_person),
|
|
joinedload(DewarModel.return_address),
|
|
joinedload(DewarModel.shipment),
|
|
)
|
|
.filter(DewarModel.id == dewar_id)
|
|
.first()
|
|
)
|
|
|
|
if not dewar:
|
|
raise HTTPException(status_code=404, detail="Dewar not found")
|
|
|
|
return DewarSchema.from_orm(dewar)
|
|
|
|
|
|
@router.put("/{dewar_id}", response_model=DewarSchema)
|
|
async def update_dewar(
|
|
dewar_id: int, dewar_update: DewarUpdate, db: Session = Depends(get_db)
|
|
) -> DewarSchema:
|
|
dewar = db.query(DewarModel).filter(DewarModel.id == dewar_id).first()
|
|
|
|
if not dewar:
|
|
raise HTTPException(status_code=404, detail="Dewar not found")
|
|
|
|
for key, value in dewar_update.dict(exclude_unset=True).items():
|
|
if hasattr(dewar, key):
|
|
setattr(dewar, key, value)
|
|
|
|
db.commit()
|
|
db.refresh(dewar)
|
|
return dewar
|
|
|
|
|
|
@router.delete("/{dewar_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def delete_dewar(dewar_id: int, db: Session = Depends(get_db)):
|
|
# Fetch the Dewar from the database
|
|
dewar = db.query(DewarModel).filter(DewarModel.id == dewar_id).first()
|
|
if not dewar:
|
|
raise HTTPException(status_code=404, detail="Dewar not found")
|
|
|
|
# Check for associated logistics events
|
|
logistics_event_exists = (
|
|
db.query(LogisticsEvent).filter(LogisticsEvent.dewar_id == dewar_id).first()
|
|
)
|
|
if logistics_event_exists:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Dewar {dewar_id} has associated logistics events."
|
|
f"Deletion not allowed.",
|
|
)
|
|
|
|
# Check associated pucks and their events
|
|
for puck in dewar.pucks:
|
|
puck_event_exists = (
|
|
db.query(PuckEvent).filter(PuckEvent.puck_id == puck.id).first()
|
|
)
|
|
if puck_event_exists:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Puck {puck.id} "
|
|
f"associated with this Dewar has events."
|
|
f"Deletion not allowed.",
|
|
)
|
|
|
|
# Check associated samples and their events
|
|
for sample in puck.samples:
|
|
sample_event_exists = (
|
|
db.query(SampleEvent).filter(SampleEvent.sample_id == sample.id).first()
|
|
)
|
|
if sample_event_exists:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Sample {sample.id} "
|
|
f"associated with Puck {puck.id} has events."
|
|
f" Deletion not allowed.",
|
|
)
|
|
|
|
# Perform cascade deletion: Delete samples, pucks, and the dewar
|
|
for puck in dewar.pucks:
|
|
for sample in puck.samples:
|
|
db.delete(sample) # Delete associated samples
|
|
db.delete(puck) # Delete associated puck
|
|
db.delete(dewar) # Finally, delete the dewar itself
|
|
|
|
db.commit()
|
|
return
|
|
|
|
|
|
# New routes for shipments
|
|
@router.get("/shipments", response_model=List[ShipmentSchema])
|
|
async def get_all_shipments(db: Session = Depends(get_db)):
|
|
try:
|
|
shipments = get_shipments(db)
|
|
return shipments
|
|
except SQLAlchemyError as e:
|
|
logging.error(f"Database error occurred: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
|
|
@router.get("/shipments/{id}", response_model=ShipmentSchema)
|
|
async def get_single_shipment(id: int, db: Session = Depends(get_db)):
|
|
try:
|
|
shipment = get_shipment_by_id(db, id)
|
|
if shipment is None:
|
|
raise HTTPException(status_code=404, detail="Shipment not found")
|
|
return shipment
|
|
except SQLAlchemyError as e:
|
|
logging.error(f"Database error occurred: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|