245 lines
8.3 KiB
Python

from fastapi import APIRouter, HTTPException, status, Depends
from sqlalchemy.orm import Session
from typing import List
import uuid
from app.schemas import Puck as PuckSchema, PuckCreate, PuckUpdate, SetTellPosition, PuckEvent
from app.models import Puck as PuckModel, Sample as SampleModel, PuckEvent as PuckEventModel, Slot as SlotModel, LogisticsEvent as LogisticsEventModel, Dewar as DewarModel
from app.dependencies import get_db
from datetime import datetime
import logging
router = APIRouter()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@router.get("/", response_model=List[PuckSchema])
async def get_pucks(db: Session = Depends(get_db)):
return db.query(PuckModel).all()
@router.get("/{puck_id}", response_model=PuckSchema)
async def get_puck(puck_id: str, db: Session = Depends(get_db)):
puck = db.query(PuckModel).filter(PuckModel.id == puck_id).first()
if not puck:
raise HTTPException(status_code=404, detail="Puck not found")
return puck
@router.post("/", response_model=PuckSchema, status_code=status.HTTP_201_CREATED)
async def create_puck(puck: PuckCreate, db: Session = Depends(get_db)) -> PuckSchema:
puck_id = f'PUCK-{uuid.uuid4().hex[:8].upper()}'
db_puck = PuckModel(
id=puck_id,
puck_name=puck.puck_name,
puck_type=puck.puck_type,
puck_location_in_dewar=puck.puck_location_in_dewar,
dewar_id=puck.dewar_id
)
db.add(db_puck)
db.commit()
db.refresh(db_puck)
return db_puck
@router.put("/{puck_id}", response_model=PuckSchema)
async def update_puck(puck_id: str, updated_puck: PuckUpdate, db: Session = Depends(get_db)):
puck = db.query(PuckModel).filter(PuckModel.id == puck_id).first()
if not puck:
raise HTTPException(status_code=404, detail="Puck not found")
for key, value in updated_puck.dict(exclude_unset=True).items():
setattr(puck, key, value)
db.commit()
db.refresh(puck)
return puck
@router.delete("/{puck_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_puck(puck_id: str, db: Session = Depends(get_db)):
puck = db.query(PuckModel).filter(PuckModel.id == puck_id).first()
if not puck:
raise HTTPException(status_code=404, detail="Puck not found")
db.delete(puck)
db.commit()
return
@router.put("/{puck_id}/tell_position", status_code=status.HTTP_200_OK)
async def set_tell_position(
puck_id: int,
request: SetTellPosition,
db: Session = Depends(get_db)
):
# Get the requested tell_position
tell_position = request.tell_position
# Define valid positions
valid_positions = [f"{letter}{num}" for letter in "ABCDEF" for num in range(1, 6)] + ["null", None]
# Validate tell_position
if tell_position not in valid_positions:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid tell_position value. Must be one of {valid_positions}.",
)
# Set the correct tell_position logic
actual_position = None if tell_position in ["null", None] else tell_position
# Create a new PuckEvent (always a new event, even with null/None)
new_puck_event = PuckEventModel(
puck_id=puck_id,
tell_position=actual_position, # Null for disassociation, else the valid position
event_type="tell_position_set", # Example event type
timestamp=datetime.utcnow(),
)
db.add(new_puck_event)
db.commit()
db.refresh(new_puck_event)
# Send the response
return {
"message": "New tell position event created successfully",
"tell_position": new_puck_event.tell_position,
"timestamp": new_puck_event.timestamp,
}
@router.get("/{puck_id}/last-tell-position", status_code=status.HTTP_200_OK)
async def get_last_tell_position(puck_id: str, db: Session = Depends(get_db)):
# Query the most recent tell_position_set event for the given puck_id
last_event = (
db.query(PuckEventModel)
.filter(PuckEventModel.puck_id == puck_id, PuckEventModel.event_type == "tell_position_set")
.order_by(PuckEventModel.timestamp.desc())
.first()
)
# If no event is found, return a 404 error
if not last_event:
raise HTTPException(
status_code=404,
detail=f"No 'tell_position' event found for puck with ID {puck_id}",
)
# Return the details of the last tell_position event
return {
"puck_id": puck_id,
"tell_position": last_event.tell_position,
"timestamp": last_event.timestamp,
}
@router.get("/slot/{slot_identifier}", response_model=List[dict])
async def get_pucks_by_slot(
slot_identifier: str,
db: Session = Depends(get_db)
):
"""
Retrieve all pucks associated with all dewars linked to the given slot
(by ID or keyword) via 'beamline' events.
- Accepts slot keywords like PXI, PXII, PXIII.
- Retrieves all dewars (and their names) associated with the slot.
"""
# Map keywords to slot IDs
slot_aliases = {
"PXI": 47,
"PXII": 48,
"PXIII": 49,
"X06SA": 47,
"X10SA": 48,
"X06DA": 49
}
# Check if the slot identifier is an alias or ID
try:
slot_id = int(slot_identifier) # If the user provided a numeric ID
alias = next((k for k, v in slot_aliases.items() if v == slot_id), slot_identifier)
except ValueError:
slot_id = slot_aliases.get(slot_identifier.upper()) # Try mapping alias
alias = slot_identifier.upper() # Keep alias as-is for error messages
if not slot_id:
raise HTTPException(
status_code=400,
detail="Invalid slot identifier. Must be an ID or one of the following: PXI, PXII, PXIII, X06SA, X10SA, X06DA."
)
# Verify that the slot exists
slot = db.query(SlotModel).filter(SlotModel.id == slot_id).first()
if not slot:
raise HTTPException(
status_code=404,
detail=f"Slot not found for identifier '{alias}'."
)
logger.info(f"Slot found: ID={slot.id}, Label={slot.label}")
# Retrieve all beamline events associated with the slot
beamline_events = (
db.query(LogisticsEventModel)
.filter(
LogisticsEventModel.slot_id == slot_id,
LogisticsEventModel.event_type == "beamline"
)
.order_by(LogisticsEventModel.timestamp.desc())
.all()
)
if not beamline_events:
logger.warning(f"No dewars associated to this beamline '{alias}'.")
raise HTTPException(
status_code=404,
detail=f"No dewars found for the given beamline '{alias}'."
)
logger.info(f"Found {len(beamline_events)} beamline events for slot_id={slot_id}.")
# Use the beamline events to find all associated dewars
dewar_ids = {event.dewar_id for event in beamline_events if event.dewar_id}
dewars = db.query(DewarModel).filter(DewarModel.id.in_(dewar_ids)).all()
if not dewars:
logger.warning(f"No dewars found for beamline '{alias}'.")
raise HTTPException(
status_code=404,
detail=f"No dewars found for beamline '{alias}'."
)
logger.info(f"Found {len(dewars)} dewars for beamline '{alias}'.")
# Create a mapping of dewar_id to dewar_name
dewar_mapping = {dewar.id: dewar.dewar_name for dewar in dewars}
# Retrieve all pucks associated with the dewars
puck_list = (
db.query(PuckModel)
.filter(PuckModel.dewar_id.in_([dewar.id for dewar in dewars]))
.all()
)
if not puck_list:
logger.warning(f"No pucks found for dewars associated with beamline '{alias}'.")
raise HTTPException(
status_code=404,
detail=f"No pucks found for dewars associated with beamline '{alias}'."
)
logger.info(f"Found {len(puck_list)} pucks for beamline '{alias}'.")
# Add the dewar_name to the output for each puck
puck_output = [
{
"id": puck.id,
"puck_name": puck.puck_name,
"puck_type": puck.puck_type,
"dewar_id": puck.dewar_id,
"dewar_name": dewar_mapping.get(puck.dewar_id) # Link dewar_name
}
for puck in puck_list
]
# Return the list of pucks with their associated dewar names
return puck_output