added a router for logistics, now creating label

This commit is contained in:
GotthardG
2024-11-15 14:04:30 +01:00
parent 6083c72a1d
commit 0eb0bc3486
8 changed files with 2313 additions and 32 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 579 KiB

View File

@ -1,3 +1,6 @@
import os
import tempfile # <-- Add this import
import xml.etree.ElementTree as ET
from fastapi import APIRouter, HTTPException, status, Depends, Response
from sqlalchemy.orm import Session, joinedload
from typing import List
@ -11,28 +14,30 @@ from app.schemas import (
DewarType as DewarTypeSchema,
DewarTypeCreate,
DewarSerialNumber as DewarSerialNumberSchema,
DewarSerialNumberCreate
DewarSerialNumberCreate,
Shipment as ShipmentSchema # Clearer name for schema
)
from app.models import (
Dewar as DewarModel,
Puck as PuckModel,
Sample as SampleModel,
DewarType as DewarTypeModel,
DewarSerialNumber as DewarSerialNumberModel
DewarSerialNumber as DewarSerialNumberModel,
Shipment as ShipmentModel # Clearer name for model
)
from app.dependencies import get_db
import uuid
import qrcode
import io
from io import BytesIO
from PIL import Image
from reportlab.lib.pagesizes import A5
from PIL import ImageFont, ImageDraw, Image
from reportlab.lib.pagesizes import A5, landscape
from reportlab.lib.units import cm
from reportlab.pdfgen import canvas
from app.crud import get_shipments, get_shipment_by_id # Import CRUD functions for shipment
router = APIRouter()
def generate_unique_id(db: Session) -> str:
while True:
unique_id = str(uuid.uuid4())
@ -118,52 +123,113 @@ async def generate_dewar_qrcode(dewar_id: int, db: Session = Depends(get_db)):
def generate_label(dewar):
buffer = BytesIO()
c = canvas.Canvas(buffer, pagesize=A5)
# Set page orientation to landscape
c = canvas.Canvas(buffer, pagesize=landscape(A5))
# Draw header
# Dimensions for the A5 landscape
page_width, page_height = landscape(A5)
# Path to the PNG logo
file_dir = os.path.dirname(os.path.abspath(__file__))
png_logo_path = os.path.join(file_dir, "Heidi-logo.png")
# Open the logo with PIL to get its size
logo_image = Image.open(png_logo_path)
logo_aspect_ratio = logo_image.width / logo_image.height # original aspect ratio
# Desired logo width in the PDF (you can adjust this size)
desired_logo_width = 4 * cm
desired_logo_height = desired_logo_width / logo_aspect_ratio # maintain aspect ratio
# Draw header text
c.setFont("Helvetica-Bold", 16)
c.drawCentredString(10.5 * cm, 14 * cm, "COMPANY LOGO / TITLE")
c.drawString(2 * cm, page_height - 2 * cm, "Paul Scherrer Institut")
# Draw the Heidi logo with preserved aspect ratio
c.drawImage(png_logo_path, page_width - desired_logo_width - 2 * cm,
page_height - desired_logo_height - 2 * cm,
width=desired_logo_width, height=desired_logo_height, mask='auto')
# Draw details section
c.setFont("Helvetica", 12)
c.drawString(2 * cm, 12.5 * cm, f"Dewar Name: {dewar.dewar_name}")
c.drawString(2 * cm, 11.5 * cm, f"Unique ID: {dewar.unique_id}")
if dewar.dewar_type:
c.drawString(2 * cm, 10.5 * cm, f"Dewar Type: {dewar.dewar_type.dewar_type}")
else:
c.drawString(2 * cm, 10.5 * cm, "Dewar Type: Unknown")
c.drawString(2 * cm, 9.5 * cm, "Beamtime Information: Placeholder")
y_position = page_height - 4 * cm # Adjusted to ensure text doesn't overlap with the logo
line_height = 0.8 * cm
if dewar.shipment:
c.drawString(2 * cm, y_position, f"Shipment Name: {dewar.shipment.shipment_name}")
y_position -= line_height
c.drawString(2 * cm, y_position, f"Dewar Name: {dewar.dewar_name}")
y_position -= line_height
c.drawString(2 * cm, y_position, f"Unique ID: {dewar.unique_id}")
y_position -= line_height
if dewar.contact_person:
contact_person = dewar.contact_person
c.drawString(2 * cm, y_position, f"Contact: {contact_person.firstname} {contact_person.lastname}")
y_position -= line_height
c.drawString(2 * cm, y_position, f"Email: {contact_person.email}")
y_position -= line_height
c.drawString(2 * cm, y_position, f"Phone: {contact_person.phone_number}")
y_position -= line_height
if dewar.return_address:
return_address = dewar.return_address
c.drawString(2 * cm, y_position, f"Return Address: {return_address.street}")
y_position -= line_height
c.drawString(2 * cm, y_position, f"City: {return_address.city}")
y_position -= line_height
c.drawString(2 * cm, y_position, f"Postal Code: {return_address.zipcode}")
y_position -= line_height
c.drawString(2 * cm, y_position, f"Country: {return_address.country}")
y_position -= line_height
c.drawString(2 * cm, y_position, f"Beamtime Information: Placeholder")
# Generate QR code
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr = qrcode.QRCode(version=1, box_size=10, border=4)
qr.add_data(dewar.unique_id)
qr.make(fit=True)
img = qr.make_image(fill='black', back_color='white')
qr_io = BytesIO()
img.save(qr_io, format='PNG')
qr_io.seek(0)
qr_image = Image.open(qr_io)
qr_img = qr.make_image(fill='black', back_color='white').convert("RGBA")
# Save this QR code to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_file:
qr_img.save(temp_file, format='PNG')
temp_file_path = temp_file.name
# Add QR code to PDF
c.drawInlineImage(qr_image, 8 * cm, 5 * cm, width=4 * cm, height=4 * cm)
c.drawImage(temp_file_path, page_width - 6 * cm, 5 * cm, width=4 * cm, height=4 * cm)
# Add footer text
c.setFont("Helvetica", 10)
c.drawCentredString(10.5 * cm, 4 * cm, "Scan for more information")
c.drawCentredString(page_width / 2, 2 * cm, "Scan for more information")
# Draw border
c.rect(1 * cm, 3 * cm, 18 * cm, 12 * cm)
c.setLineWidth(1)
c.rect(1 * cm, 1 * cm, page_width - 2 * cm, page_height - 2 * cm) # Adjusted dimensions
# Finalize the canvas
c.showPage()
c.save()
buffer.seek(0)
# Cleanup temporary file
os.remove(temp_file_path)
return buffer
@router.get("/{dewar_id}/download-label", response_class=Response)
async def download_dewar_label(dewar_id: int, db: Session = Depends(get_db)):
dewar = db.query(DewarModel).options(joinedload(DewarModel.dewar_type)).filter(DewarModel.id == dewar_id).first()
dewar = db.query(DewarModel).options(
joinedload(DewarModel.pucks).joinedload(PuckModel.samples),
joinedload(DewarModel.contact_person),
joinedload(DewarModel.return_address),
joinedload(DewarModel.shipment)
).filter(DewarModel.id == dewar_id).first()
if not dewar:
raise HTTPException(status_code=404, detail="Dewar not found")
if not dewar.unique_id:
@ -220,7 +286,10 @@ def get_all_serial_numbers(db: Session = Depends(get_db)):
@router.get("/{dewar_id}", response_model=DewarSchema)
async def get_dewar(dewar_id: int, db: Session = Depends(get_db)):
dewar = db.query(DewarModel).options(
joinedload(DewarModel.pucks).joinedload(PuckModel.samples)
joinedload(DewarModel.pucks).joinedload(PuckModel.samples),
joinedload(DewarModel.contact_person),
joinedload(DewarModel.return_address),
joinedload(DewarModel.shipment)
).filter(DewarModel.id == dewar_id).first()
if not dewar:
@ -252,4 +321,25 @@ async def delete_dewar(dewar_id: int, db: Session = Depends(get_db)):
db.delete(dewar)
db.commit()
return
return
# New routes for shipments
@router.get("/shipments", response_model=List[ShipmentSchema])
async def get_all_shipments(db: Session = Depends(get_db)):
try:
shipments = get_shipments(db)
return shipments
except SQLAlchemyError as e:
logging.error(f"Database error occurred: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/shipments/{id}", response_model=ShipmentSchema)
async def get_single_shipment(id: int, db: Session = Depends(get_db)):
try:
shipment = get_shipment_by_id(db, id)
if shipment is None:
raise HTTPException(status_code=404, detail="Shipment not found")
return shipment
except SQLAlchemyError as e:
logging.error(f"Database error occurred: {e}")
raise HTTPException(status_code=500, detail="Internal server error")

View File

@ -0,0 +1,67 @@
from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
from typing import List
from app.models import Dewar as DewarModel, Slot as SlotModel, LogisticsEvent as LogisticsEventModel
from app.schemas import LogisticsEventCreate, SlotCreate, Slot as SlotSchema
from app.database import get_db
router = APIRouter()
@router.post("/scan-dewar")
async def scan_dewar(event_data: LogisticsEventCreate, db: Session = Depends(get_db)):
dewar_qr_code = event_data.dewar_qr_code
location_qr_code = event_data.location_qr_code
transaction_type = event_data.transaction_type
dewar = db.query(DewarModel).filter(DewarModel.unique_id == dewar_qr_code).first()
if not dewar:
raise HTTPException(status_code=404, detail="Dewar not found")
if transaction_type == 'incoming':
slot = db.query(SlotModel).filter(SlotModel.id == location_qr_code).first()
if not slot or slot.occupied:
raise HTTPException(status_code=404, detail="Slot not found or already occupied")
slot.occupied = True
log_event(db, dewar.id, slot.id, 'incoming')
elif transaction_type == 'beamline':
log_event(db, dewar.id, None, 'beamline')
elif transaction_type == 'outgoing':
slot = db.query(SlotModel).filter(SlotModel.id == location_qr_code).first()
if not slot or not slot.occupied:
raise HTTPException(status_code=404, detail="Slot not found or not occupied")
slot.occupied = False
log_event(db, dewar.id, slot.id, 'outgoing')
elif transaction_type == 'release':
slot = db.query(SlotModel).filter(SlotModel.id == location_qr_code).first()
if not slot or not slot.occupied:
raise HTTPException(status_code=404, detail="Slot not found or not occupied")
slot.occupied = False
log_event(db, dewar.id, slot.id, 'released')
db.commit()
return {"message": "Status updated successfully"}
def log_event(db: Session, dewar_id: int, slot_id: int, event_type: str):
new_event = LogisticsEventModel(dewar_id=dewar_id, slot_id=slot_id, event_type=event_type)
db.add(new_event)
@router.get("/refill-status", response_model=List[SlotSchema])
async def refill_status(db: Session = Depends(get_db)):
slots_needing_refill = db.query(SlotModel).filter(SlotModel.needs_refill == True).all()
result = []
current_time = datetime.utcnow()
for slot in slots_needing_refill:
time_until_next_refill = slot.last_refill + timedelta(hours=24) - current_time
result.append({
'slot_id': slot.id,
'needs_refill': slot.needs_refill,
'time_until_refill': str(time_until_next_refill)
})
return result