Integration of logistics app as a separate frontend to associate a dewar to a storage slot

This commit is contained in:
GotthardG 2024-11-19 10:55:37 +01:00
parent 2a27b61a1f
commit 48fd2c3a7c
2 changed files with 16 additions and 62 deletions

2
.gitignore vendored
View File

@ -1 +1,3 @@
/frontend/.env.local
/backend/ssl/
/backend/app/test.db

View File

@ -5,30 +5,34 @@ from typing import List
from app.models import Dewar as DewarModel, Slot as SlotModel, LogisticsEvent as LogisticsEventModel
from app.schemas import LogisticsEventCreate, Slot as SlotSchema, Dewar as DewarSchema
from app.database import get_db
from app.data import slots_data
import logging
router = APIRouter()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@router.get("/slots", response_model=List[SlotSchema])
async def get_all_slots(db: Session = Depends(get_db)):
slots = db.query(SlotModel).all()
return slots
@router.get("/dewars", response_model=List[DewarSchema])
async def get_all_dewars(db: Session = Depends(get_db)):
dewars = db.query(DewarModel).all()
return dewars
@router.get("/dewar/{unique_id}", response_model=DewarSchema)
async def get_dewar_by_unique_id(unique_id: str, db: Session = Depends(get_db)):
logger.info(f"Received request for dewar with unique_id: {unique_id}")
dewar = db.query(DewarModel).filter(DewarModel.unique_id == unique_id.strip()).first()
if not dewar:
logger.warning(f"Dewar with unique_id '{unique_id}' not found.")
raise HTTPException(status_code=404, detail="Dewar not found")
logger.info(f"Returning dewar: {dewar}")
return dewar
@router.post("/dewar/scan", response_model=dict)
async def scan_dewar(event_data: LogisticsEventCreate, db: Session = Depends(get_db)):
dewar_qr_code = event_data.dewar_qr_code
@ -43,71 +47,19 @@ async def scan_dewar(event_data: LogisticsEventCreate, db: Session = Depends(get
if transaction_type == 'incoming':
if not slot or slot.occupied:
raise HTTPException(status_code=400, detail="Slot not found or already occupied")
slot.dewar_id = dewar.id
slot.occupied = True
elif transaction_type == 'outgoing':
if not slot or not slot.occupied:
raise HTTPException(status_code=400, detail="Slot not found or not occupied")
if not slot or not slot.occupied or slot.dewar_id != dewar.id:
raise HTTPException(status_code=400, detail="Slot not found or dewar not associated with slot")
slot.dewar_id = None
slot.occupied = False
# Log the event
log_event(db, dewar.id, slot.id if slot else None, transaction_type)
db.commit()
return {"message": "Status updated successfully"}
def log_event(db: Session, dewar_id: int, slot_id: str, event_type: str):
def log_event(db: Session, dewar_id: int, slot_id: int, event_type: str):
new_event = LogisticsEventModel(dewar_id=dewar_id, slot_id=slot_id, event_type=event_type)
db.add(new_event)
db.commit()
# Convert SQLAlchemy model to dictionary (if necessary)
def slot_to_dict(slot: SlotModel) -> dict:
return {
"id": slot.id,
"qr_code": slot.qr_code,
"label": slot.label,
"qr_base": slot.qr_base,
"occupied": slot.occupied,
"needs_refill": slot.needs_refill,
"last_refill": slot.last_refill.isoformat(),
"time_until_refill": str(slot.time_until_refill) if slot.time_until_refill else None # Ensure correct format
}
@router.get("/slots", response_model=List[dict])
def read_slots(db: Session = Depends(get_db)):
return [slot_to_dict(slot) for slot in db.query(SlotModel).all()]
@router.get("/dewars/refill-status", response_model=List[DewarSchema])
async def refill_status(db: Session = Depends(get_db)):
dewars = db.query(DewarModel).all()
result = []
current_time = datetime.utcnow()
for dewar in dewars:
last_refill_event = (
db.query(LogisticsEventModel)
.filter(
LogisticsEventModel.dewar_id == dewar.id,
LogisticsEventModel.event_type == 'refill'
)
.order_by(LogisticsEventModel.timestamp.desc())
.first()
)
time_until_refill = None
if last_refill_event:
time_until_refill = last_refill_event.timestamp + timedelta(hours=24) - current_time
dewar_data = DewarSchema(
id=dewar.id,
dewar_name=dewar.dewar_name,
unique_id=dewar.unique_id,
time_until_refill=str(time_until_refill) if time_until_refill else None # Ensure correct format
)
result.append(dewar_data)
return result