https and ssl integration on the backend, frontend and started integration of logistics app as a separate frontend

This commit is contained in:
GotthardG
2024-11-19 09:56:05 +01:00
parent a91d74b718
commit a931bfb8ec
14 changed files with 264 additions and 267 deletions

View File

@ -1,107 +1,113 @@
from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.orm import Session
import logging
from datetime import datetime, timedelta
from typing import List
from app.models import Dewar as DewarModel, Slot as SlotModel, LogisticsEvent as LogisticsEventModel
from app.schemas import LogisticsEventCreate, SlotCreate, Slot as SlotSchema, Dewar as DewarSchema
from app.schemas import LogisticsEventCreate, Slot as SlotSchema, Dewar as DewarSchema
from app.database import get_db
from app.data import slots_data
import logging
router = APIRouter()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@router.get("/dewars", response_model=List[DewarSchema])
async def get_all_prouts(db: Session = Depends(get_db)):
try:
dewars = db.query(DewarModel).all()
logging.info(f"Retrieved {len(dewars)} dewars from the database")
return dewars
except Exception as e:
logger.error(f"An error occurred: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
async def get_all_dewars(db: Session = Depends(get_db)):
dewars = db.query(DewarModel).all()
return dewars
@router.get("/dewar/{qr_code}", response_model=DewarSchema)
async def get_dewar_by_qr_code(qr_code: str, db: Session = Depends(get_db)):
logger.info(f"Received qr_code: {qr_code}")
trimmed_qr_code = qr_code.strip()
logger.info(f"Trimmed qr_code after stripping: {trimmed_qr_code}")
@router.get("/dewar/{unique_id}", response_model=DewarSchema)
async def get_dewar_by_unique_id(unique_id: str, db: Session = Depends(get_db)):
dewar = db.query(DewarModel).filter(DewarModel.unique_id == unique_id.strip()).first()
if not dewar:
raise HTTPException(status_code=404, detail="Dewar not found")
return dewar
try:
dewar = db.query(DewarModel).filter(DewarModel.unique_id == trimmed_qr_code).first()
logger.info(f"Query Result: {dewar}")
if dewar:
return dewar
else:
logger.error(f"Dewar not found for unique_id: {trimmed_qr_code}")
raise HTTPException(status_code=404, detail="Dewar not found")
except Exception as e:
logger.error(f"An error occurred: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/dewar/scan", response_model=LogisticsEventCreate)
@router.post("/dewar/scan", response_model=dict)
async def scan_dewar(event_data: LogisticsEventCreate, db: Session = Depends(get_db)):
dewar_qr_code = event_data.dewar_qr_code
location_qr_code = event_data.location_qr_code
transaction_type = event_data.transaction_type
try:
dewar = db.query(DewarModel).filter(DewarModel.unique_id == dewar_qr_code).first()
if not dewar:
raise HTTPException(status_code=404, detail="Dewar not found")
dewar = db.query(DewarModel).filter(DewarModel.unique_id == dewar_qr_code).first()
if not dewar:
raise HTTPException(status_code=404, detail="Dewar not found")
if transaction_type == 'incoming':
slot = db.query(SlotModel).filter(SlotModel.id == location_qr_code).first()
if not slot or slot.occupied:
raise HTTPException(status_code=404, detail="Slot not found or already occupied")
slot.occupied = True
log_event(db, dewar.id, slot.id, 'incoming')
slot = db.query(SlotModel).filter(SlotModel.qr_code == location_qr_code).first()
if transaction_type == 'incoming':
if not slot or slot.occupied:
raise HTTPException(status_code=400, detail="Slot not found or already occupied")
slot.occupied = True
elif transaction_type == 'beamline':
log_event(db, dewar.id, None, 'beamline')
elif transaction_type == 'outgoing':
if not slot or not slot.occupied:
raise HTTPException(status_code=400, detail="Slot not found or not occupied")
slot.occupied = False
elif transaction_type == 'outgoing':
slot = db.query(SlotModel).filter(SlotModel.id == location_qr_code).first()
if not slot or not slot.occupied:
raise HTTPException(status_code=404, detail="Slot not found or not occupied")
slot.occupied = False
log_event(db, dewar.id, slot.id, 'outgoing')
# Log the event
log_event(db, dewar.id, slot.id if slot else None, transaction_type)
elif transaction_type == 'release':
slot = db.query(SlotModel).filter(SlotModel.id == location_qr_code).first()
if not slot or not slot.occupied:
raise HTTPException(status_code=404, detail="Slot not found or not occupied")
slot.occupied = False
log_event(db, dewar.id, slot.id, 'released')
db.commit()
return {"message": "Status updated successfully"}
db.commit()
logger.info(f"Status updated successfully for Dewar ID: {dewar.id}")
return {"message": "Status updated successfully"}
except Exception as e:
logger.error(f"An error occurred: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
def log_event(db: Session, dewar_id: int, slot_id: int, event_type: str):
new_event = LogisticsEventModel(dewar_id=dewar_id if dewar_id else None, slot_id=slot_id, event_type=event_type)
def log_event(db: Session, dewar_id: int, slot_id: str, event_type: str):
new_event = LogisticsEventModel(dewar_id=dewar_id, slot_id=slot_id, event_type=event_type)
db.add(new_event)
db.commit()
logger.info(f"Logged event {event_type} for Dewar ID: {dewar_id}")
@router.get("/slots/refill-status", response_model=List[SlotSchema])
# Convert SQLAlchemy model to dictionary (if necessary)
def slot_to_dict(slot: SlotModel) -> dict:
return {
"id": slot.id,
"qr_code": slot.qr_code,
"label": slot.label,
"qr_base": slot.qr_base,
"occupied": slot.occupied,
"needs_refill": slot.needs_refill,
"last_refill": slot.last_refill.isoformat(),
"time_until_refill": str(slot.time_until_refill) if slot.time_until_refill else None # Ensure correct format
}
@router.get("/slots", response_model=List[dict])
def read_slots(db: Session = Depends(get_db)):
return [slot_to_dict(slot) for slot in db.query(SlotModel).all()]
@router.get("/dewars/refill-status", response_model=List[DewarSchema])
async def refill_status(db: Session = Depends(get_db)):
slots_needing_refill = db.query(SlotModel).filter(SlotModel.needs_refill == True).all()
dewars = db.query(DewarModel).all()
result = []
current_time = datetime.utcnow()
for slot in slots_needing_refill:
time_until_next_refill = slot.last_refill + timedelta(hours=24) - current_time
result.append({
'slot_id': slot.id,
'needs_refill': slot.needs_refill,
'time_until_refill': str(time_until_next_refill)
})
for dewar in dewars:
last_refill_event = (
db.query(LogisticsEventModel)
.filter(
LogisticsEventModel.dewar_id == dewar.id,
LogisticsEventModel.event_type == 'refill'
)
.order_by(LogisticsEventModel.timestamp.desc())
.first()
)
time_until_refill = None
if last_refill_event:
time_until_refill = last_refill_event.timestamp + timedelta(hours=24) - current_time
dewar_data = DewarSchema(
id=dewar.id,
dewar_name=dewar.dewar_name,
unique_id=dewar.unique_id,
time_until_refill=str(time_until_refill) if time_until_refill else None # Ensure correct format
)
result.append(dewar_data)
return result