
Upgraded multiple dependencies across Babel, Emotion, ESLint, and DevExpress packages in `package-lock.json` to their latest versions. These updates ensure compatibility, fix minor issues, and improve overall performance and security.
666 lines
22 KiB
Python
666 lines
22 KiB
Python
import os
|
|
import tempfile
|
|
import time
|
|
import random
|
|
import hashlib
|
|
from fastapi import APIRouter, HTTPException, status, Depends, Response
|
|
from sqlalchemy.orm import Session, joinedload
|
|
from typing import List
|
|
import logging
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from app.schemas import (
|
|
Dewar as DewarSchema,
|
|
DewarCreate,
|
|
DewarUpdate,
|
|
DewarType as DewarTypeSchema,
|
|
DewarTypeCreate,
|
|
DewarSerialNumber as DewarSerialNumberSchema,
|
|
DewarSerialNumberCreate,
|
|
Shipment as ShipmentSchema,
|
|
Dewar,
|
|
SampleUpdate,
|
|
Sample,
|
|
Puck,
|
|
SampleEventResponse, # Clearer name for schema
|
|
)
|
|
from app.models import (
|
|
Dewar as DewarModel,
|
|
Puck as PuckModel,
|
|
Sample as SampleModel,
|
|
DewarType as DewarTypeModel,
|
|
DewarSerialNumber as DewarSerialNumberModel,
|
|
LogisticsEvent,
|
|
PuckEvent,
|
|
SampleEvent,
|
|
)
|
|
from app.dependencies import get_db
|
|
import qrcode
|
|
import io
|
|
from io import BytesIO
|
|
from PIL import Image
|
|
from reportlab.lib.pagesizes import A5, landscape
|
|
from reportlab.lib.units import cm
|
|
from reportlab.pdfgen import canvas
|
|
from app.crud import (
|
|
get_shipments,
|
|
get_shipment_by_id,
|
|
) # Import CRUD functions for shipment
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def generate_unique_id(db: Session, length: int = 16) -> str:
|
|
while True:
|
|
base_string = f"{time.time()}{random.randint(0, 10 ** 6)}"
|
|
hash_object = hashlib.sha256(base_string.encode())
|
|
hash_digest = hash_object.hexdigest()
|
|
unique_id = "".join(random.choices(hash_digest, k=length))
|
|
existing_dewar = (
|
|
db.query(DewarModel).filter(DewarModel.unique_id == unique_id).first()
|
|
)
|
|
if not existing_dewar:
|
|
break
|
|
return unique_id
|
|
|
|
|
|
@router.post("/", response_model=DewarSchema, status_code=status.HTTP_201_CREATED)
|
|
async def create_or_update_dewar(
|
|
shipment_id: int,
|
|
dewar: DewarCreate,
|
|
db: Session = Depends(get_db),
|
|
) -> DewarSchema:
|
|
try:
|
|
# Query existing dewar by name within the shipment
|
|
existing_dewar = (
|
|
db.query(DewarModel)
|
|
.filter(
|
|
DewarModel.dewar_name == dewar.dewar_name,
|
|
DewarModel.shipment_id == shipment_id,
|
|
)
|
|
.first()
|
|
)
|
|
|
|
if existing_dewar:
|
|
logging.debug(
|
|
f"Updating existing dewar with name "
|
|
f"{dewar.dewar_name} in shipment {shipment_id}"
|
|
)
|
|
|
|
# Check for associated puck events
|
|
for puck in existing_dewar.pucks:
|
|
puck_event_exists = (
|
|
db.query(PuckEvent).filter(PuckEvent.puck_id == puck.id).first()
|
|
)
|
|
if puck_event_exists:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Puck '{puck.puck_name}'"
|
|
f" (ID: {puck.id}) has associated "
|
|
f"events and cannot be deleted or replaced.",
|
|
)
|
|
|
|
# Check for associated sample events within each puck
|
|
for sample in puck.samples:
|
|
sample_event_exists = (
|
|
db.query(SampleEvent)
|
|
.filter(SampleEvent.sample_id == sample.id)
|
|
.first()
|
|
)
|
|
if sample_event_exists:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Sample '{sample.sample_name}'"
|
|
f" (ID: {sample.id})"
|
|
f" within Puck '{puck.puck_name}'"
|
|
f" has associated events. Update forbidden.",
|
|
)
|
|
|
|
# Delete associated pucks and samples if no events are found
|
|
for puck in existing_dewar.pucks:
|
|
for sample in puck.samples:
|
|
db.delete(sample)
|
|
db.delete(puck)
|
|
db.commit()
|
|
|
|
# Update dewar metadata
|
|
for key, value in dewar.dict(
|
|
exclude={"pucks", "number_of_pucks", "number_of_samples"}
|
|
).items():
|
|
if (
|
|
hasattr(existing_dewar, key)
|
|
and hasattr(type(existing_dewar), key)
|
|
and isinstance(getattr(type(existing_dewar), key), property)
|
|
and getattr(type(existing_dewar), key).fset
|
|
):
|
|
setattr(existing_dewar, key, value)
|
|
|
|
# Commit updates to existing dewar
|
|
db.commit()
|
|
|
|
# Add new pucks and samples
|
|
for puck_data in dewar.pucks:
|
|
puck = PuckModel(
|
|
dewar_id=existing_dewar.id,
|
|
puck_name=puck_data.puck_name,
|
|
puck_type=puck_data.puck_type,
|
|
puck_location_in_dewar=puck_data.puck_location_in_dewar,
|
|
)
|
|
db.add(puck)
|
|
db.commit()
|
|
db.refresh(puck)
|
|
|
|
for sample_data in puck_data.samples:
|
|
sample = SampleModel(
|
|
puck_id=puck.id,
|
|
sample_name=sample_data.sample_name,
|
|
proteinname=sample_data.proteinname,
|
|
position=sample_data.position,
|
|
priority=sample_data.priority,
|
|
comments=sample_data.comments,
|
|
data_collection_parameters=dict(
|
|
sample_data.data_collection_parameters or {}
|
|
),
|
|
)
|
|
db.add(sample)
|
|
db.commit()
|
|
db.refresh(sample)
|
|
|
|
db.refresh(existing_dewar)
|
|
return existing_dewar
|
|
|
|
# Create a completely new dewar if none exists
|
|
dewar_obj = DewarModel(
|
|
dewar_name=dewar.dewar_name,
|
|
tracking_number=dewar.tracking_number,
|
|
status=dewar.status,
|
|
ready_date=dewar.ready_date,
|
|
shipping_date=dewar.shipping_date,
|
|
arrival_date=dewar.arrival_date,
|
|
returning_date=dewar.returning_date,
|
|
contact_person_id=dewar.contact_person_id,
|
|
return_address_id=dewar.return_address_id,
|
|
shipment_id=shipment_id, # Associate with the shipment
|
|
)
|
|
db.add(dewar_obj)
|
|
db.commit()
|
|
db.refresh(dewar_obj)
|
|
|
|
# Add pucks and samples for the new dewar
|
|
for puck_data in dewar.pucks:
|
|
puck = PuckModel(
|
|
dewar_id=dewar_obj.id,
|
|
puck_name=puck_data.puck_name,
|
|
puck_type=puck_data.puck_type,
|
|
puck_location_in_dewar=puck_data.puck_location_in_dewar,
|
|
)
|
|
db.add(puck)
|
|
db.commit()
|
|
db.refresh(puck)
|
|
|
|
for sample_data in puck_data.samples:
|
|
sample = SampleModel(
|
|
puck_id=puck.id,
|
|
sample_name=sample_data.sample_name,
|
|
proteinname=sample_data.proteinname,
|
|
position=sample_data.position,
|
|
priority=sample_data.priority,
|
|
comments=sample_data.comments,
|
|
data_collection_parameters=dict(
|
|
sample_data.data_collection_parameters or {}
|
|
),
|
|
)
|
|
db.add(sample)
|
|
db.commit()
|
|
db.refresh(sample)
|
|
|
|
return dewar_obj
|
|
|
|
except SQLAlchemyError as e:
|
|
logging.error(f"Database error occurred: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
|
|
@router.post("/{dewar_id}/generate-qrcode")
|
|
async def generate_dewar_qrcode(dewar_id: int, db: Session = Depends(get_db)):
|
|
dewar = db.query(DewarModel).filter(DewarModel.id == dewar_id).first()
|
|
if not dewar:
|
|
raise HTTPException(status_code=404, detail="Dewar not found")
|
|
|
|
if not dewar.unique_id:
|
|
dewar.unique_id = generate_unique_id(db)
|
|
db.commit()
|
|
|
|
qr = qrcode.QRCode(version=1, box_size=10, border=5)
|
|
qr.add_data(dewar.unique_id)
|
|
qr.make(fit=True)
|
|
img = qr.make_image(fill="black", back_color="white")
|
|
|
|
buf = io.BytesIO()
|
|
img.save(buf)
|
|
buf.seek(0)
|
|
dewar.qrcode = dewar.unique_id
|
|
dewar.qrcode_image = buf.getvalue()
|
|
db.commit()
|
|
|
|
return {"message": "QR Code generated", "qrcode": dewar.unique_id}
|
|
|
|
|
|
def generate_label(dewar):
|
|
buffer = BytesIO()
|
|
# Set page orientation to landscape
|
|
c = canvas.Canvas(buffer, pagesize=landscape(A5))
|
|
|
|
# Dimensions for the A5 landscape
|
|
page_width, page_height = landscape(A5)
|
|
|
|
# Path to the PNG logo
|
|
file_dir = os.path.dirname(os.path.abspath(__file__))
|
|
png_logo_path = os.path.join(file_dir, "Heidi-logo.png")
|
|
|
|
# Open the logo with PIL to get its size
|
|
logo_image = Image.open(png_logo_path)
|
|
logo_aspect_ratio = logo_image.width / logo_image.height # original aspect ratio
|
|
|
|
# Desired logo width in the PDF (you can adjust this size)
|
|
desired_logo_width = 4 * cm
|
|
desired_logo_height = (
|
|
desired_logo_width / logo_aspect_ratio
|
|
) # maintain aspect ratio
|
|
|
|
# Draw header text
|
|
c.setFont("Helvetica-Bold", 16)
|
|
c.drawString(2 * cm, page_height - 2 * cm, "Paul Scherrer Institut")
|
|
|
|
# Draw the Heidi logo with preserved aspect ratio
|
|
c.drawImage(
|
|
png_logo_path,
|
|
page_width - desired_logo_width - 2 * cm,
|
|
page_height - desired_logo_height - 2 * cm,
|
|
width=desired_logo_width,
|
|
height=desired_logo_height,
|
|
mask="auto",
|
|
)
|
|
|
|
# Draw details section
|
|
c.setFont("Helvetica", 12)
|
|
|
|
y_position = (
|
|
page_height - 4 * cm
|
|
) # Adjusted to ensure text doesn't overlap with the logo
|
|
line_height = 0.8 * cm
|
|
|
|
if dewar.shipment:
|
|
c.drawString(
|
|
2 * cm, y_position, f"Shipment Name: {dewar.shipment.shipment_name}"
|
|
)
|
|
y_position -= line_height
|
|
|
|
c.drawString(2 * cm, y_position, f"Dewar Name: {dewar.dewar_name}")
|
|
y_position -= line_height
|
|
|
|
c.drawString(2 * cm, y_position, f"Unique ID: {dewar.unique_id}")
|
|
y_position -= line_height
|
|
|
|
if dewar.contact_person:
|
|
contact_person = dewar.contact_person
|
|
c.drawString(
|
|
2 * cm,
|
|
y_position,
|
|
f"Contact: {contact_person.firstname} {contact_person.lastname}",
|
|
)
|
|
y_position -= line_height
|
|
c.drawString(2 * cm, y_position, f"Email: {contact_person.email}")
|
|
y_position -= line_height
|
|
c.drawString(2 * cm, y_position, f"Phone: {contact_person.phone_number}")
|
|
y_position -= line_height
|
|
|
|
if dewar.return_address:
|
|
return_address = dewar.return_address
|
|
c.drawString(2 * cm, y_position, f"Return Address: {return_address.street}")
|
|
y_position -= line_height
|
|
c.drawString(2 * cm, y_position, f"City: {return_address.city}")
|
|
y_position -= line_height
|
|
c.drawString(2 * cm, y_position, f"Postal Code: {return_address.zipcode}")
|
|
y_position -= line_height
|
|
c.drawString(2 * cm, y_position, f"Country: {return_address.country}")
|
|
y_position -= line_height
|
|
|
|
c.drawString(2 * cm, y_position, "Beamtime Information: Placeholder")
|
|
|
|
# Generate QR code
|
|
qr = qrcode.QRCode(version=1, box_size=10, border=4)
|
|
qr.add_data(dewar.unique_id)
|
|
qr.make(fit=True)
|
|
qr_img = qr.make_image(fill="black", back_color="white").convert("RGBA")
|
|
|
|
# Save this QR code to a temporary file
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file:
|
|
qr_img.save(temp_file, format="PNG")
|
|
temp_file_path = temp_file.name
|
|
|
|
# Add QR code to PDF
|
|
c.drawImage(
|
|
temp_file_path, page_width - 6 * cm, 5 * cm, width=4 * cm, height=4 * cm
|
|
)
|
|
|
|
# Add footer text
|
|
c.setFont("Helvetica", 10)
|
|
c.drawCentredString(page_width / 2, 2 * cm, "Scan for more information")
|
|
|
|
# Draw border
|
|
c.setLineWidth(1)
|
|
c.rect(
|
|
1 * cm, 1 * cm, page_width - 2 * cm, page_height - 2 * cm
|
|
) # Adjusted dimensions
|
|
|
|
# Finalize the canvas
|
|
c.showPage()
|
|
c.save()
|
|
|
|
buffer.seek(0)
|
|
|
|
# Cleanup temporary file
|
|
os.remove(temp_file_path)
|
|
|
|
return buffer
|
|
|
|
|
|
@router.get("/{dewar_id}/download-label", response_class=Response)
|
|
async def download_dewar_label(dewar_id: int, db: Session = Depends(get_db)):
|
|
dewar = (
|
|
db.query(DewarModel)
|
|
.options(
|
|
joinedload(DewarModel.pucks).joinedload(PuckModel.samples),
|
|
joinedload(DewarModel.contact_person),
|
|
joinedload(DewarModel.return_address),
|
|
joinedload(DewarModel.shipment),
|
|
)
|
|
.filter(DewarModel.id == dewar_id)
|
|
.first()
|
|
)
|
|
|
|
if not dewar:
|
|
raise HTTPException(status_code=404, detail="Dewar not found")
|
|
if not dewar.unique_id:
|
|
raise HTTPException(
|
|
status_code=404, detail="QR Code not generated for this dewar"
|
|
)
|
|
|
|
buffer = generate_label(dewar)
|
|
|
|
return Response(
|
|
buffer.getvalue(),
|
|
media_type="application/pdf",
|
|
headers={
|
|
"Content-Disposition": f"attachment; filename=dewar_label_{dewar.id}.pdf"
|
|
},
|
|
)
|
|
|
|
|
|
@router.put("/samples/{sample_id}", response_model=Sample)
|
|
async def update_sample(
|
|
sample_id: int,
|
|
sample_update: SampleUpdate,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
# Log the payload received from the frontend
|
|
logging.info(
|
|
f"Payload received for sample ID {sample_id}: "
|
|
f"{sample_update.dict(exclude_unset=True)}"
|
|
)
|
|
|
|
# Query the sample from the database
|
|
sample = db.query(SampleModel).filter(SampleModel.id == sample_id).first()
|
|
|
|
if not sample:
|
|
logging.error(f"Sample with ID {sample_id} not found")
|
|
raise HTTPException(status_code=404, detail="Sample not found")
|
|
|
|
# Apply updates
|
|
for key, value in sample_update.dict(exclude_unset=True).items():
|
|
if hasattr(sample, key):
|
|
setattr(sample, key, value)
|
|
|
|
# Commit changes to the database
|
|
db.commit()
|
|
db.refresh(sample) # Reload the updated sample object
|
|
|
|
# Log the updated sample before returning it
|
|
logging.info(f"Updated sample with ID {sample_id}: {Sample.from_orm(sample)}")
|
|
|
|
return Sample.from_orm(sample)
|
|
|
|
|
|
@router.get("/dewars/{dewar_id}/samples", response_model=Dewar)
|
|
async def get_dewar_samples(dewar_id: int, db: Session = Depends(get_db)):
|
|
# Fetch the Dewar with nested relationships
|
|
dewar = db.query(DewarModel).filter(DewarModel.id == dewar_id).first()
|
|
if not dewar:
|
|
raise HTTPException(status_code=404, detail="Dewar not found")
|
|
|
|
# Explicitly map nested relationships
|
|
dewar_data = {
|
|
"id": dewar.id,
|
|
"dewar_name": dewar.dewar_name,
|
|
"tracking_number": dewar.tracking_number,
|
|
"status": dewar.status,
|
|
"number_of_pucks": len(dewar.pucks), # Calculate number of pucks
|
|
"number_of_samples": sum(
|
|
len(puck.samples) for puck in dewar.pucks
|
|
), # Calculate total samples
|
|
"ready_date": dewar.ready_date,
|
|
"shipping_date": dewar.shipping_date,
|
|
"arrival_date": dewar.arrival_date,
|
|
"returning_date": dewar.returning_date,
|
|
"contact_person_id": dewar.contact_person.id if dewar.contact_person else None,
|
|
"return_address_id": dewar.return_address.id if dewar.return_address else None,
|
|
"shipment_id": dewar.shipment_id,
|
|
"contact_person": dewar.contact_person,
|
|
"return_address": dewar.return_address,
|
|
"pucks": [
|
|
Puck(
|
|
id=puck.id,
|
|
puck_name=puck.puck_name,
|
|
puck_type=puck.puck_type,
|
|
puck_location_in_dewar=puck.puck_location_in_dewar,
|
|
dewar_id=dewar.id,
|
|
samples=[
|
|
Sample(
|
|
id=sample.id,
|
|
sample_name=sample.sample_name,
|
|
position=sample.position,
|
|
puck_id=sample.puck_id,
|
|
crystalname=getattr(sample, "crystalname", None),
|
|
proteinname=getattr(sample, "proteinname", None),
|
|
positioninpuck=getattr(sample, "positioninpuck", None),
|
|
priority=sample.priority,
|
|
comments=sample.comments,
|
|
data_collection_parameters=sample.data_collection_parameters,
|
|
events=[
|
|
SampleEventResponse.from_orm(event)
|
|
for event in sample.events
|
|
],
|
|
)
|
|
for sample in puck.samples
|
|
],
|
|
)
|
|
for puck in dewar.pucks
|
|
],
|
|
}
|
|
|
|
# Return the Pydantic Dewar model
|
|
return Dewar(**dewar_data)
|
|
|
|
|
|
@router.get("/", response_model=List[DewarSchema])
|
|
async def get_dewars(db: Session = Depends(get_db)):
|
|
try:
|
|
dewars = db.query(DewarModel).options(joinedload(DewarModel.pucks)).all()
|
|
return dewars
|
|
except SQLAlchemyError as e:
|
|
logging.error(f"Database error occurred: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
|
|
@router.get("/dewar-types", response_model=List[DewarTypeSchema])
|
|
def get_dewar_types(db: Session = Depends(get_db)):
|
|
return db.query(DewarTypeModel).all()
|
|
|
|
|
|
@router.get(
|
|
"/dewar-types/{type_id}/serial-numbers",
|
|
response_model=List[DewarSerialNumberSchema],
|
|
)
|
|
def get_serial_numbers(type_id: int, db: Session = Depends(get_db)):
|
|
return (
|
|
db.query(DewarSerialNumberModel)
|
|
.filter(DewarSerialNumberModel.dewar_type_id == type_id)
|
|
.all()
|
|
)
|
|
|
|
|
|
@router.post("/dewar-types", response_model=DewarTypeSchema)
|
|
def create_dewar_type(dewar_type: DewarTypeCreate, db: Session = Depends(get_db)):
|
|
db_type = DewarTypeModel(**dewar_type.dict())
|
|
db.add(db_type)
|
|
db.commit()
|
|
db.refresh(db_type)
|
|
return db_type
|
|
|
|
|
|
@router.post("/dewar-serial-numbers", response_model=DewarSerialNumberSchema)
|
|
def create_dewar_serial_number(
|
|
serial_number: DewarSerialNumberCreate, db: Session = Depends(get_db)
|
|
):
|
|
db_serial = DewarSerialNumberModel(**serial_number.dict())
|
|
db.add(db_serial)
|
|
db.commit()
|
|
db.refresh(db_serial)
|
|
return db_serial
|
|
|
|
|
|
@router.get("/dewar-serial-numbers", response_model=List[DewarSerialNumberSchema])
|
|
def get_all_serial_numbers(db: Session = Depends(get_db)):
|
|
try:
|
|
serial_numbers = db.query(DewarSerialNumberModel).all()
|
|
return serial_numbers
|
|
except SQLAlchemyError as e:
|
|
logging.error(f"Database error occurred: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
|
|
@router.get("/{dewar_id}", response_model=DewarSchema)
|
|
async def get_dewar(dewar_id: int, db: Session = Depends(get_db)):
|
|
dewar = (
|
|
db.query(DewarModel)
|
|
.options(
|
|
joinedload(DewarModel.pucks).joinedload(PuckModel.samples),
|
|
joinedload(DewarModel.contact_person),
|
|
joinedload(DewarModel.return_address),
|
|
joinedload(DewarModel.shipment),
|
|
)
|
|
.filter(DewarModel.id == dewar_id)
|
|
.first()
|
|
)
|
|
|
|
if not dewar:
|
|
raise HTTPException(status_code=404, detail="Dewar not found")
|
|
|
|
return DewarSchema.from_orm(dewar)
|
|
|
|
|
|
@router.put("/{dewar_id}", response_model=DewarSchema)
|
|
async def update_dewar(
|
|
dewar_id: int, dewar_update: DewarUpdate, db: Session = Depends(get_db)
|
|
) -> DewarSchema:
|
|
dewar = db.query(DewarModel).filter(DewarModel.id == dewar_id).first()
|
|
|
|
if not dewar:
|
|
raise HTTPException(status_code=404, detail="Dewar not found")
|
|
|
|
for key, value in dewar_update.dict(exclude_unset=True).items():
|
|
if hasattr(dewar, key):
|
|
setattr(dewar, key, value)
|
|
|
|
db.commit()
|
|
db.refresh(dewar)
|
|
return dewar
|
|
|
|
|
|
@router.delete("/{dewar_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def delete_dewar(dewar_id: int, db: Session = Depends(get_db)):
|
|
# Fetch the Dewar from the database
|
|
dewar = db.query(DewarModel).filter(DewarModel.id == dewar_id).first()
|
|
if not dewar:
|
|
raise HTTPException(status_code=404, detail="Dewar not found")
|
|
|
|
# Check for associated logistics events
|
|
logistics_event_exists = (
|
|
db.query(LogisticsEvent).filter(LogisticsEvent.dewar_id == dewar_id).first()
|
|
)
|
|
if logistics_event_exists:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Dewar {dewar_id} has associated logistics events."
|
|
f"Deletion not allowed.",
|
|
)
|
|
|
|
# Check associated pucks and their events
|
|
for puck in dewar.pucks:
|
|
puck_event_exists = (
|
|
db.query(PuckEvent).filter(PuckEvent.puck_id == puck.id).first()
|
|
)
|
|
if puck_event_exists:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Puck {puck.id} "
|
|
f"associated with this Dewar has events."
|
|
f"Deletion not allowed.",
|
|
)
|
|
|
|
# Check associated samples and their events
|
|
for sample in puck.samples:
|
|
sample_event_exists = (
|
|
db.query(SampleEvent).filter(SampleEvent.sample_id == sample.id).first()
|
|
)
|
|
if sample_event_exists:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Sample {sample.id} "
|
|
f"associated with Puck {puck.id} has events."
|
|
f" Deletion not allowed.",
|
|
)
|
|
|
|
# Perform cascade deletion: Delete samples, pucks, and the dewar
|
|
for puck in dewar.pucks:
|
|
for sample in puck.samples:
|
|
db.delete(sample) # Delete associated samples
|
|
db.delete(puck) # Delete associated puck
|
|
db.delete(dewar) # Finally, delete the dewar itself
|
|
|
|
db.commit()
|
|
return
|
|
|
|
|
|
# New routes for shipments
|
|
@router.get("/shipments", response_model=List[ShipmentSchema])
|
|
async def get_all_shipments(db: Session = Depends(get_db)):
|
|
try:
|
|
shipments = get_shipments(db)
|
|
return shipments
|
|
except SQLAlchemyError as e:
|
|
logging.error(f"Database error occurred: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
|
|
@router.get("/shipments/{id}", response_model=ShipmentSchema)
|
|
async def get_single_shipment(id: int, db: Session = Depends(get_db)):
|
|
try:
|
|
shipment = get_shipment_by_id(db, id)
|
|
if shipment is None:
|
|
raise HTTPException(status_code=404, detail="Shipment not found")
|
|
return shipment
|
|
except SQLAlchemyError as e:
|
|
logging.error(f"Database error occurred: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|