GotthardG 9e5ae2b43c Refactor puck event handling and add tell filtering
Updated puck event queries to improve robustness and ensure distinct results. Introduced filtering by `tell` in specific API endpoints and added validation for `tell` values. Incremented project version to 0.1.0a21 to reflect API changes.
2025-02-04 17:07:25 +01:00

661 lines
22 KiB
Python

from datetime import datetime
from fastapi import APIRouter, HTTPException, status, Depends
from sqlalchemy.orm import Session
from sqlalchemy.sql import func
from typing import List
import uuid
import re
from app.schemas import (
Puck as PuckSchema,
PuckCreate,
PuckUpdate,
PuckWithTellPosition,
Sample,
SetTellPositionRequest,
DataCollectionParameters,
)
from app.models import (
Puck as PuckModel,
PuckEvent as PuckEventModel,
Sample as SampleModel,
LogisticsEvent as LogisticsEventModel,
Dewar as DewarModel,
)
from app.dependencies import get_db
import logging
router = APIRouter()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
VALID_TELL_OPTIONS = {"X06SA", "X06DA", "X10SA"}
def validate_tell(tell: str):
if tell not in VALID_TELL_OPTIONS:
raise ValueError(
f"Invalid tell: {tell}. Must be one of {', '.join(VALID_TELL_OPTIONS)}"
)
def normalize_puck_name(name: str) -> str:
"""
Normalize a puck_name to remove special characters and ensure consistent formatting.
"""
name = re.sub(r"[^A-Z0-9]", "", name.upper()) # Remove special characters
return name
def resolve_slot_id(slot_identifier: str) -> int:
"""
Convert a slot identifier (either numeric or alias) to a numeric slot ID.
Args:
slot_identifier (str): The slot identifier to resolve (e.g., "PXI",
"PXII", "48").
Returns:
int: The numeric slot ID corresponding to the identifier.
Raises:
HTTPException: If the slot identifier is invalid or unrecognized.
"""
# Map slot identifier keywords to numeric slot IDs
slot_aliases = {
"PXI": 47,
"PXII": 48,
"PXIII": 49,
"X06SA": 47,
"X10SA": 48,
"X06DA": 49,
}
# Try to resolve the identifier
try:
return int(slot_identifier) # If it's a numeric slot ID, return it directly
except ValueError:
# Convert alias to slot ID using the mapping
slot_id = slot_aliases.get(slot_identifier.upper())
if slot_id:
return slot_id
# Log error and raise an exception for invalid identifiers
logger.error(f"Invalid slot identifier: {slot_identifier}")
raise HTTPException(
status_code=400, detail=f"Invalid slot identifier: {slot_identifier}"
)
def get_pucks_at_beamline(slot_id: int, db: Session) -> List[PuckWithTellPosition]:
"""
Fetch all pucks currently located at the beamline for a given slot ID.
"""
# Subquery: Latest logistic event for each dewar
latest_event_subquery = (
db.query(
LogisticsEventModel.dewar_id.label("dewar_id"),
func.max(LogisticsEventModel.timestamp).label("latest_event_time"),
)
.group_by(LogisticsEventModel.dewar_id)
.subquery(name="latest_event_subquery")
)
# Query dewars in the slot with the latest event "beamline"
dewars = (
db.query(DewarModel)
.join(LogisticsEventModel, DewarModel.id == LogisticsEventModel.dewar_id)
.join(
latest_event_subquery,
(LogisticsEventModel.dewar_id == latest_event_subquery.c.dewar_id)
& (
LogisticsEventModel.timestamp
== latest_event_subquery.c.latest_event_time
),
)
.filter(
LogisticsEventModel.slot_id == slot_id,
LogisticsEventModel.event_type == "beamline",
)
.all()
)
if not dewars:
logger.warning(f"No dewars found for slot ID: {slot_id}")
return []
# Map dewars to their details
dewar_ids = [dewar.id for dewar in dewars]
dewar_map = {dewar.id: dewar.dewar_name for dewar in dewars}
dewar_pgroups = {dewar.id: dewar.pgroups for dewar in dewars}
# Subquery: Latest event for each puck
latest_puck_event_subquery = (
db.query(
PuckEventModel.puck_id.label("puck_id"),
func.max(PuckEventModel.timestamp).label("latest_event_time"),
)
.group_by(PuckEventModel.puck_id)
.subquery(name="latest_event_subquery")
)
# Query pucks for the selected dewars
pucks_with_latest_events = (
db.query(
PuckModel,
PuckEventModel.event_type,
PuckEventModel.tell_position,
PuckEventModel.timestamp, # Useful for debugging or edge cases
DewarModel,
)
.join(
latest_puck_event_subquery,
PuckModel.id == latest_puck_event_subquery.c.puck_id,
isouter=True,
)
.join(
PuckEventModel,
(PuckEventModel.puck_id == latest_puck_event_subquery.c.puck_id)
& (
PuckEventModel.timestamp
== latest_puck_event_subquery.c.latest_event_time
),
isouter=True,
)
.join(DewarModel, PuckModel.dewar_id == DewarModel.id, isouter=True)
.filter(PuckModel.dewar_id.in_(dewar_ids))
.distinct() # Ensure no duplicates
.all()
)
# Prepare the results
results = {}
for (
puck,
event_type,
tell_position,
event_timestamp,
dewar,
) in pucks_with_latest_events:
dewar_name = dewar_map.get(puck.dewar_id)
pgroup = dewar_pgroups.get(puck.dewar_id)
# If the event is None or explicitly a "puck_removed", set `tell_position=None`
if event_type is None or event_type == "puck_removed":
tell_position = None
# Always replace results since we are processing the latest event
results[puck.id] = PuckWithTellPosition(
id=puck.id,
pgroup=pgroup,
puck_name=puck.puck_name,
puck_type=puck.puck_type,
puck_location_in_dewar=int(puck.puck_location_in_dewar)
if puck.puck_location_in_dewar
else None,
dewar_id=puck.dewar_id,
dewar_name=dewar_name,
tell_position=tell_position, # Respect if `None` is explicitly set
)
return list(results.values())
@router.get("/", response_model=List[PuckSchema])
async def get_pucks(db: Session = Depends(get_db)):
return db.query(PuckModel).all()
@router.put("/set-tell-positions", status_code=status.HTTP_200_OK)
async def set_tell_positions(
payload: SetTellPositionRequest, # Accept the wrapped request as a single payload
db: Session = Depends(get_db),
):
# Extract the tell position (slot identifier) and the list of pucks
tell = payload.tell
pucks = payload.pucks
try:
# Resolve slot ID from the provided identifier
slot_id = resolve_slot_id(tell)
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Invalid tell or slot identifier: {tell}." f" Error: {str(e)}",
)
# Fetch existing pucks at the beamline slot
pucks_at_beamline = get_pucks_at_beamline(slot_id, db)
beamline_puck_map = {
normalize_puck_name(puck.puck_name): puck for puck in pucks_at_beamline
}
# Check if the payload has any pucks
if not pucks: # Empty payload case
results = []
# Deduplicate pucks based on ID
unique_pucks_at_beamline = {
puck.id: puck for puck in pucks_at_beamline
}.values()
for puck in unique_pucks_at_beamline:
# Fetch the most recent event for the puck
last_event = (
db.query(PuckEventModel)
.filter(
PuckEventModel.puck_id == puck.id,
)
.order_by(
PuckEventModel.id.desc()
) # Order by timestamp ensures we get the latest event
.first()
)
# Log the last event for the puck
if last_event:
logger.info(
f"Processing puck: {puck.puck_name}, "
f"Last Event -> Type: {last_event.event_type}, "
f"Tell Position: {last_event.tell_position}, "
f"Timestamp: {last_event.timestamp}"
)
else:
logger.info(
f"Processing puck: {puck.puck_name}, No events found for this puck"
)
# Remove all pucks, including those without events or with None
# tell_position
if last_event.tell_position is not None:
try:
# Add a puck_removed event
remove_event = PuckEventModel(
puck_id=puck.id,
tell=None, # Nullify the `tell` for removal
tell_position=None,
event_type="puck_removed",
timestamp=datetime.now(),
)
db.add(remove_event)
# Record this removal in the response
results.append(
{
"puck_name": puck.puck_name,
"tell": tell,
"removed_position": last_event.tell_position
if last_event
else None,
"status": "removed",
"message": "Puck removed due to empty payload.",
}
)
except Exception as e:
# Handle and log the error for this particular puck
results.append(
{
"puck_name": puck.puck_name,
"error": str(e),
}
)
# Commit all removal events and return the results
db.commit()
return results
# If the payload contains pucks, continue with the regular logic
results = []
processed_pucks = (
set()
) # To track pucks that have been processed (unchanged or updated)
# Existing pucks' most recent events
last_events_map = {
puck.id: db.query(PuckEventModel)
.filter(
PuckEventModel.puck_id == puck.id,
)
.order_by(PuckEventModel.id.desc())
.first()
for puck in pucks_at_beamline
}
# Step 1: Process each puck in the payload
for puck_data in pucks:
try:
puck_name = puck_data.puck_name
normalized_name = normalize_puck_name(puck_name)
new_position = puck_data.tell_position
existing_puck = beamline_puck_map.get(normalized_name)
if not existing_puck:
# If the puck is not found, it's a potential error
results.append(
{
"puck_name": puck_name,
"error": f"Puck '{puck_name}' not found at the beamline.",
}
)
continue
processed_pucks.add(existing_puck.id) # Mark this puck as processed
# Check if the tell position is unchanged
last_event = last_events_map.get(existing_puck.id)
if last_event and last_event.tell_position == new_position:
results.append(
{
"puck_name": puck_name,
"tell": tell,
"current_position": new_position,
"status": "unchanged",
"message": "No change in tell_position.",
}
)
continue
# Add a "puck_removed" event if the position is being changed (old
# position removed)
if last_event and last_event.tell_position is not None:
remove_event = PuckEventModel(
puck_id=existing_puck.id,
tell=None,
tell_position=None,
event_type="puck_removed",
timestamp=datetime.now(),
)
db.add(remove_event)
# Add a new "tell_position_set" event (new position)
new_event = PuckEventModel(
puck_id=existing_puck.id,
tell=tell,
tell_position=new_position,
event_type="tell_position_set",
timestamp=datetime.now(),
)
db.add(new_event)
results.append(
{
"puck_name": puck_name,
"tell": tell,
"new_position": new_position,
"previous_position": (
last_event.tell_position if last_event else None
),
"status": "updated",
"message": "Tell position updated successfully.",
}
)
except Exception as e:
results.append(
{
"puck_name": puck_name,
"error": str(e),
}
)
# Step 2: Handle "absent" pucks for removal
for puck in pucks_at_beamline:
if puck.id not in processed_pucks: # This puck was not in the payload
last_event = last_events_map.get(
puck.id
) # Fetch the last event for the puck
# Only remove pucks that have a valid last event with a non-null
# tell_position
if last_event and last_event.tell_position is not None:
try:
remove_event = PuckEventModel(
puck_id=puck.id,
tell=None,
tell_position=None,
event_type="puck_removed",
timestamp=datetime.now(),
)
db.add(remove_event)
results.append(
{
"puck_name": puck.puck_name,
"tell": tell,
"removed_position": last_event.tell_position,
"status": "removed",
"message": "Puck removed from tell_position.",
}
)
except Exception as e:
results.append(
{
"puck_name": puck.puck_name,
"error": str(e),
}
)
# Step 3: Commit all changes to the DB
db.commit()
return results
@router.get("/with-tell-position", response_model=List[PuckWithTellPosition])
async def get_pucks_with_tell_position(
tell: str, # Specify tell as a query parameter
db: Session = Depends(get_db),
):
"""
Retrieve all pucks with a valid `tell_position` set (non-null),
their associated samples, and the latest `tell_position` value (if any),
filtered by a specific `tell`.
"""
# Validate the incoming `tell` value
try:
validate_tell(tell) # Ensure `tell` is valid using predefined valid options
except ValueError as error:
raise HTTPException(
status_code=400, detail=str(error)
) # Raise error for invalid tells
# Step 1: Prepare a subquery to fetch the latest event timestamp for each puck.
latest_event_subquery = (
db.query(
PuckEventModel.puck_id,
func.max(PuckEventModel.timestamp).label("latest_timestamp"),
)
.group_by(PuckEventModel.puck_id) # Group by puck
.subquery()
)
# Step 2: Main query - fetch pucks with latest `tell_position` event details
pucks_with_events = (
db.query(PuckModel, PuckEventModel, DewarModel)
.join(PuckEventModel, PuckModel.id == PuckEventModel.puck_id)
.join(
latest_event_subquery,
(PuckEventModel.puck_id == latest_event_subquery.c.puck_id)
& (PuckEventModel.timestamp == latest_event_subquery.c.latest_timestamp),
)
.outerjoin(
DewarModel, PuckModel.dewar_id == DewarModel.id
) # Optional, include related dewar info
.filter(
PuckEventModel.tell_position.isnot(None)
) # Only include non-null `tell_position`
.filter(
PuckEventModel.event_type == "tell_position_set"
) # Only include relevant event types
.filter(PuckEventModel.tell == tell) # Filter by the specific `tell` variable
.all()
)
# Return an empty list if no relevant pucks are found
if not pucks_with_events:
return []
# Step 3: Construct the response with pucks and their valid tell_position
results = []
for puck, event, dewar in pucks_with_events:
# Fetch associated samples for this puck
samples = db.query(SampleModel).filter(SampleModel.puck_id == puck.id).all()
# Construct the response model
results.append(
PuckWithTellPosition(
id=int(puck.id),
puck_name=str(puck.puck_name),
puck_type=str(puck.puck_type),
puck_location_in_dewar=int(puck.puck_location_in_dewar)
if puck.puck_location_in_dewar
else None,
dewar_id=int(puck.dewar_id) if puck.dewar_id else None,
dewar_name=str(dewar.dewar_name)
if dewar and dewar.dewar_name
else None,
pgroup=str(dewar.pgroups)
if dewar and dewar.pgroups
else None, # Replace later by puck pgroup if needed
samples=[
Sample(
id=sample.id,
sample_name=sample.sample_name,
mount_count=sample.mount_count,
position=sample.position,
puck_id=sample.puck_id,
data_collection_parameters=(
DataCollectionParameters(
**sample.data_collection_parameters
)
if isinstance(sample.data_collection_parameters, dict)
else sample.data_collection_parameters
),
)
for sample in samples
],
tell_position=str(event.tell_position),
)
)
return results
@router.get("/{puck_id}", response_model=PuckSchema)
async def get_puck(puck_id: str, db: Session = Depends(get_db)):
puck = db.query(PuckModel).filter(PuckModel.id == puck_id).first()
if not puck:
raise HTTPException(status_code=404, detail="Puck not found")
return puck
@router.post("/", response_model=PuckSchema, status_code=status.HTTP_201_CREATED)
async def create_puck(puck: PuckCreate, db: Session = Depends(get_db)) -> PuckSchema:
puck_id = f"PUCK-{uuid.uuid4().hex[:8].upper()}"
db_puck = PuckModel(
id=puck_id,
puck_name=puck.puck_name,
puck_type=puck.puck_type,
puck_location_in_dewar=puck.puck_location_in_dewar,
dewar_id=puck.dewar_id,
)
db.add(db_puck)
db.commit()
db.refresh(db_puck)
return db_puck
@router.put("/{puck_id}", response_model=PuckSchema)
async def update_puck(
puck_id: str, updated_puck: PuckUpdate, db: Session = Depends(get_db)
):
puck = db.query(PuckModel).filter(PuckModel.id == puck_id).first()
if not puck:
raise HTTPException(status_code=404, detail="Puck not found")
for key, value in updated_puck.dict(exclude_unset=True).items():
setattr(puck, key, value)
db.commit()
db.refresh(puck)
return puck
@router.delete("/{puck_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_puck(puck_id: str, db: Session = Depends(get_db)):
puck = db.query(PuckModel).filter(PuckModel.id == puck_id).first()
if not puck:
raise HTTPException(status_code=404, detail="Puck not found")
db.delete(puck)
db.commit()
return
@router.get("/{puck_id}/last-tell-position", status_code=status.HTTP_200_OK)
async def get_last_tell_position(puck_id: str, db: Session = Depends(get_db)):
# Query the most recent tell_position_set event for the given puck_id
last_event = (
db.query(PuckEventModel)
.filter(
PuckEventModel.puck_id == puck_id,
PuckEventModel.event_type == "tell_position_set",
)
.order_by(PuckEventModel.timestamp.desc())
.first()
)
# If no event is found, return a 404 error
if not last_event:
raise HTTPException(
status_code=404,
detail=f"No 'tell_position' event found for puck with ID {puck_id}",
)
# Return the details of the last tell_position event
return {
"puck_id": puck_id,
"tell_position": last_event.tell_position,
"timestamp": last_event.timestamp,
}
@router.get("/slot/{slot_identifier}", response_model=List[PuckWithTellPosition])
async def get_pucks_by_slot(slot_identifier: str, db: Session = Depends(get_db)):
"""
Retrieve all pucks in a slot, reporting their latest event and
`tell_position` value.
Args:
slot_identifier (str): The slot identifier (e.g., "PXI", "48").
db (Session): Database session dependency.
Returns:
List[PuckWithTellPosition]: List of pucks in the specified slot.
"""
# Resolve the slot identifier to a numeric slot ID using the function
try:
slot_id = resolve_slot_id(slot_identifier)
except HTTPException as e:
logger.error(
f"Failed to resolve slot identifier: {slot_identifier}. Error: {e.detail}"
)
raise e
logger.info(f"Resolved slot identifier '{slot_identifier}' to Slot ID: {slot_id}")
# Fetch the pucks at the beamline for the resolved slot ID
pucks = get_pucks_at_beamline(slot_id, db)
if not pucks:
logger.warning(
f"No pucks found for the slot '{slot_identifier}' (ID: {slot_id})"
)
raise HTTPException(
status_code=404, detail=f"No pucks found for slot '{slot_identifier}'"
)
logger.info(
f"Found {len(pucks)} pucks for slot '{slot_identifier}' (ID: {slot_id})"
)
return pucks