GotthardG 6d67d02259 Expand sample data handling and grid display.
Integrated additional sample parameters into backend and frontend for enhanced data collection. Updated pyproject version to reflect these changes. This improves user interface flexibility and enriches displayed sample metadata.
2025-01-08 09:52:15 +01:00

476 lines
16 KiB
Python

import os
import tempfile
import time
import random
import hashlib
from fastapi import APIRouter, HTTPException, status, Depends, Response
from sqlalchemy.orm import Session, joinedload
from typing import List
import logging
from sqlalchemy.exc import SQLAlchemyError
from pydantic import ValidationError, BaseModel
from app.schemas import (
Dewar as DewarSchema,
DewarCreate,
DewarUpdate,
DewarType as DewarTypeSchema,
DewarTypeCreate,
DewarSerialNumber as DewarSerialNumberSchema,
DewarSerialNumberCreate,
Shipment as ShipmentSchema, # Clearer name for schema
)
from app.models import (
Dewar as DewarModel,
Puck as PuckModel,
Sample as SampleModel,
DewarType as DewarTypeModel,
DewarSerialNumber as DewarSerialNumberModel,
)
from app.dependencies import get_db
import qrcode
import io
from io import BytesIO
from PIL import Image
from reportlab.lib.pagesizes import A5, landscape
from reportlab.lib.units import cm
from reportlab.pdfgen import canvas
from app.crud import (
get_shipments,
get_shipment_by_id,
) # Import CRUD functions for shipment
router = APIRouter()
def generate_unique_id(db: Session, length: int = 16) -> str:
while True:
base_string = f"{time.time()}{random.randint(0, 10 ** 6)}"
hash_object = hashlib.sha256(base_string.encode())
hash_digest = hash_object.hexdigest()
unique_id = "".join(random.choices(hash_digest, k=length))
existing_dewar = (
db.query(DewarModel).filter(DewarModel.unique_id == unique_id).first()
)
if not existing_dewar:
break
return unique_id
@router.post("/", response_model=DewarSchema, status_code=status.HTTP_201_CREATED)
async def create_dewar(
dewar: DewarCreate, db: Session = Depends(get_db)
) -> DewarSchema:
try:
db_dewar = DewarModel(
dewar_name=dewar.dewar_name,
tracking_number=dewar.tracking_number,
status=dewar.status,
ready_date=dewar.ready_date,
shipping_date=dewar.shipping_date,
arrival_date=dewar.arrival_date,
returning_date=dewar.returning_date,
contact_person_id=dewar.contact_person_id,
return_address_id=dewar.return_address_id,
)
db.add(db_dewar)
db.commit()
db.refresh(db_dewar)
for puck_data in dewar.pucks:
puck = PuckModel(
dewar_id=db_dewar.id,
puck_name=puck_data.puck_name,
puck_type=puck_data.puck_type,
puck_location_in_dewar=puck_data.puck_location_in_dewar,
)
db.add(puck)
db.commit()
db.refresh(puck)
for sample_data in puck_data.samples:
logging.debug(
f"data_collection_parameters: "
f"{sample_data.data_collection_parameters}"
)
if sample_data.data_collection_parameters is None:
serialized_params = {}
elif hasattr(sample_data.data_collection_parameters, "to_dict"):
serialized_params = sample_data.data_collection_parameters.to_dict()
elif isinstance(sample_data.data_collection_parameters, BaseModel):
serialized_params = sample_data.data_collection_parameters.dict(
exclude_unset=True
)
elif isinstance(sample_data.data_collection_parameters, dict):
serialized_params = sample_data.data_collection_parameters
else:
raise ValueError(
"data_collection_parameters must be a dictionary,"
"have a to_dict method, or be None"
)
sample = SampleModel(
puck_id=puck.id,
sample_name=sample_data.sample_name,
proteinname=sample_data.proteinname,
position=sample_data.position,
priority=sample_data.priority,
comments=sample_data.comments,
data_collection_parameters=serialized_params,
)
db.add(sample)
db.commit()
db.refresh(sample)
return db_dewar
except SQLAlchemyError as e:
logging.error(f"Database error occurred: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
except ValidationError as e:
logging.error(f"Validation error occurred: {e}")
raise HTTPException(status_code=400, detail="Validation error")
@router.post("/{dewar_id}/generate-qrcode")
async def generate_dewar_qrcode(dewar_id: int, db: Session = Depends(get_db)):
dewar = db.query(DewarModel).filter(DewarModel.id == dewar_id).first()
if not dewar:
raise HTTPException(status_code=404, detail="Dewar not found")
if not dewar.unique_id:
dewar.unique_id = generate_unique_id(db)
db.commit()
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(dewar.unique_id)
qr.make(fit=True)
img = qr.make_image(fill="black", back_color="white")
buf = io.BytesIO()
img.save(buf)
buf.seek(0)
dewar.qrcode = dewar.unique_id
dewar.qrcode_image = buf.getvalue()
db.commit()
return {"message": "QR Code generated", "qrcode": dewar.unique_id}
def generate_label(dewar):
buffer = BytesIO()
# Set page orientation to landscape
c = canvas.Canvas(buffer, pagesize=landscape(A5))
# Dimensions for the A5 landscape
page_width, page_height = landscape(A5)
# Path to the PNG logo
file_dir = os.path.dirname(os.path.abspath(__file__))
png_logo_path = os.path.join(file_dir, "Heidi-logo.png")
# Open the logo with PIL to get its size
logo_image = Image.open(png_logo_path)
logo_aspect_ratio = logo_image.width / logo_image.height # original aspect ratio
# Desired logo width in the PDF (you can adjust this size)
desired_logo_width = 4 * cm
desired_logo_height = (
desired_logo_width / logo_aspect_ratio
) # maintain aspect ratio
# Draw header text
c.setFont("Helvetica-Bold", 16)
c.drawString(2 * cm, page_height - 2 * cm, "Paul Scherrer Institut")
# Draw the Heidi logo with preserved aspect ratio
c.drawImage(
png_logo_path,
page_width - desired_logo_width - 2 * cm,
page_height - desired_logo_height - 2 * cm,
width=desired_logo_width,
height=desired_logo_height,
mask="auto",
)
# Draw details section
c.setFont("Helvetica", 12)
y_position = (
page_height - 4 * cm
) # Adjusted to ensure text doesn't overlap with the logo
line_height = 0.8 * cm
if dewar.shipment:
c.drawString(
2 * cm, y_position, f"Shipment Name: {dewar.shipment.shipment_name}"
)
y_position -= line_height
c.drawString(2 * cm, y_position, f"Dewar Name: {dewar.dewar_name}")
y_position -= line_height
c.drawString(2 * cm, y_position, f"Unique ID: {dewar.unique_id}")
y_position -= line_height
if dewar.contact_person:
contact_person = dewar.contact_person
c.drawString(
2 * cm,
y_position,
f"Contact: {contact_person.firstname} {contact_person.lastname}",
)
y_position -= line_height
c.drawString(2 * cm, y_position, f"Email: {contact_person.email}")
y_position -= line_height
c.drawString(2 * cm, y_position, f"Phone: {contact_person.phone_number}")
y_position -= line_height
if dewar.return_address:
return_address = dewar.return_address
c.drawString(2 * cm, y_position, f"Return Address: {return_address.street}")
y_position -= line_height
c.drawString(2 * cm, y_position, f"City: {return_address.city}")
y_position -= line_height
c.drawString(2 * cm, y_position, f"Postal Code: {return_address.zipcode}")
y_position -= line_height
c.drawString(2 * cm, y_position, f"Country: {return_address.country}")
y_position -= line_height
c.drawString(2 * cm, y_position, "Beamtime Information: Placeholder")
# Generate QR code
qr = qrcode.QRCode(version=1, box_size=10, border=4)
qr.add_data(dewar.unique_id)
qr.make(fit=True)
qr_img = qr.make_image(fill="black", back_color="white").convert("RGBA")
# Save this QR code to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file:
qr_img.save(temp_file, format="PNG")
temp_file_path = temp_file.name
# Add QR code to PDF
c.drawImage(
temp_file_path, page_width - 6 * cm, 5 * cm, width=4 * cm, height=4 * cm
)
# Add footer text
c.setFont("Helvetica", 10)
c.drawCentredString(page_width / 2, 2 * cm, "Scan for more information")
# Draw border
c.setLineWidth(1)
c.rect(
1 * cm, 1 * cm, page_width - 2 * cm, page_height - 2 * cm
) # Adjusted dimensions
# Finalize the canvas
c.showPage()
c.save()
buffer.seek(0)
# Cleanup temporary file
os.remove(temp_file_path)
return buffer
@router.get("/{dewar_id}/download-label", response_class=Response)
async def download_dewar_label(dewar_id: int, db: Session = Depends(get_db)):
dewar = (
db.query(DewarModel)
.options(
joinedload(DewarModel.pucks).joinedload(PuckModel.samples),
joinedload(DewarModel.contact_person),
joinedload(DewarModel.return_address),
joinedload(DewarModel.shipment),
)
.filter(DewarModel.id == dewar_id)
.first()
)
if not dewar:
raise HTTPException(status_code=404, detail="Dewar not found")
if not dewar.unique_id:
raise HTTPException(
status_code=404, detail="QR Code not generated for this dewar"
)
buffer = generate_label(dewar)
return Response(
buffer.getvalue(),
media_type="application/pdf",
headers={
"Content-Disposition": f"attachment; filename=dewar_label_{dewar.id}.pdf"
},
)
@router.get("/dewars/{dewar_id}/samples", response_model=dict)
async def get_dewar_samples(dewar_id: int, db: Session = Depends(get_db)):
# Fetch Dewar, associated Pucks, and Samples
dewar = db.query(DewarModel).filter(DewarModel.id == dewar_id).first()
if not dewar:
raise HTTPException(status_code=404, detail="Dewar not found")
pucks = db.query(PuckModel).filter(PuckModel.dewar_id == dewar.id).all()
data = {"dewar": {"id": dewar.id, "dewar_name": dewar.dewar_name}, "pucks": []}
for puck in pucks:
samples = db.query(SampleModel).filter(SampleModel.puck_id == puck.id).all()
data["pucks"].append(
{
"id": puck.id,
"name": puck.puck_name,
"type": puck.puck_type,
"samples": [
{
"id": sample.id,
"position": sample.position,
"dewar_name": dewar.dewar_name,
"sample_name": sample.sample_name,
"priority": sample.priority,
"comments": sample.comments,
"proteinname": sample.proteinname,
**(sample.data_collection_parameters or {}),
}
for sample in samples
],
}
)
return data
@router.get("/", response_model=List[DewarSchema])
async def get_dewars(db: Session = Depends(get_db)):
try:
dewars = db.query(DewarModel).options(joinedload(DewarModel.pucks)).all()
return dewars
except SQLAlchemyError as e:
logging.error(f"Database error occurred: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/dewar-types", response_model=List[DewarTypeSchema])
def get_dewar_types(db: Session = Depends(get_db)):
return db.query(DewarTypeModel).all()
@router.get(
"/dewar-types/{type_id}/serial-numbers",
response_model=List[DewarSerialNumberSchema],
)
def get_serial_numbers(type_id: int, db: Session = Depends(get_db)):
return (
db.query(DewarSerialNumberModel)
.filter(DewarSerialNumberModel.dewar_type_id == type_id)
.all()
)
@router.post("/dewar-types", response_model=DewarTypeSchema)
def create_dewar_type(dewar_type: DewarTypeCreate, db: Session = Depends(get_db)):
db_type = DewarTypeModel(**dewar_type.dict())
db.add(db_type)
db.commit()
db.refresh(db_type)
return db_type
@router.post("/dewar-serial-numbers", response_model=DewarSerialNumberSchema)
def create_dewar_serial_number(
serial_number: DewarSerialNumberCreate, db: Session = Depends(get_db)
):
db_serial = DewarSerialNumberModel(**serial_number.dict())
db.add(db_serial)
db.commit()
db.refresh(db_serial)
return db_serial
@router.get("/dewar-serial-numbers", response_model=List[DewarSerialNumberSchema])
def get_all_serial_numbers(db: Session = Depends(get_db)):
try:
serial_numbers = db.query(DewarSerialNumberModel).all()
return serial_numbers
except SQLAlchemyError as e:
logging.error(f"Database error occurred: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/{dewar_id}", response_model=DewarSchema)
async def get_dewar(dewar_id: int, db: Session = Depends(get_db)):
dewar = (
db.query(DewarModel)
.options(
joinedload(DewarModel.pucks).joinedload(PuckModel.samples),
joinedload(DewarModel.contact_person),
joinedload(DewarModel.return_address),
joinedload(DewarModel.shipment),
)
.filter(DewarModel.id == dewar_id)
.first()
)
if not dewar:
raise HTTPException(status_code=404, detail="Dewar not found")
return DewarSchema.from_orm(dewar)
@router.put("/{dewar_id}", response_model=DewarSchema)
async def update_dewar(
dewar_id: int, dewar_update: DewarUpdate, db: Session = Depends(get_db)
) -> DewarSchema:
dewar = db.query(DewarModel).filter(DewarModel.id == dewar_id).first()
if not dewar:
raise HTTPException(status_code=404, detail="Dewar not found")
for key, value in dewar_update.dict(exclude_unset=True).items():
if hasattr(dewar, key):
setattr(dewar, key, value)
db.commit()
db.refresh(dewar)
return dewar
@router.delete("/{dewar_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_dewar(dewar_id: int, db: Session = Depends(get_db)):
dewar = db.query(DewarModel).filter(DewarModel.id == dewar_id).first()
if not dewar:
raise HTTPException(status_code=404, detail="Dewar not found")
db.delete(dewar)
db.commit()
return
# New routes for shipments
@router.get("/shipments", response_model=List[ShipmentSchema])
async def get_all_shipments(db: Session = Depends(get_db)):
try:
shipments = get_shipments(db)
return shipments
except SQLAlchemyError as e:
logging.error(f"Database error occurred: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/shipments/{id}", response_model=ShipmentSchema)
async def get_single_shipment(id: int, db: Session = Depends(get_db)):
try:
shipment = get_shipment_by_id(db, id)
if shipment is None:
raise HTTPException(status_code=404, detail="Shipment not found")
return shipment
except SQLAlchemyError as e:
logging.error(f"Database error occurred: {e}")
raise HTTPException(status_code=500, detail="Internal server error")