Refactor puck handling and update tell position setting

Simplify and standardize puck name normalization and validation. Refactor `set_tell_positions` to handle batch operations, improve error handling, and provide detailed responses for each puck. Removed unnecessary `with-tell-position` endpoint for better clarity and maintainability.
This commit is contained in:
GotthardG 2024-12-19 15:16:21 +01:00
parent d1bc70665f
commit da5dbb9e31
3 changed files with 121 additions and 147 deletions

View File

@ -291,7 +291,7 @@ shipments = [
pucks = [
Puck(
id=1,
puck_name="PUCK001",
puck_name="PUCK-001",
puck_type="Unipuck",
puck_location_in_dewar=1,
dewar_id=1,

View File

@ -1,7 +1,10 @@
from datetime import datetime
from fastapi import APIRouter, HTTPException, status, Depends
from sqlalchemy.orm import Session
from sqlalchemy.sql import func
from typing import List
import uuid
import re
from app.schemas import (
Puck as PuckSchema,
PuckCreate,
@ -9,7 +12,6 @@ from app.schemas import (
)
from app.models import (
Puck as PuckModel,
Sample as SampleModel,
PuckEvent as PuckEventModel,
Slot as SlotModel,
LogisticsEvent as LogisticsEventModel,
@ -24,174 +26,146 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def normalize_puck_name(name: str) -> str:
"""
Normalize a puck_name to remove special characters and ensure consistent formatting.
"""
name = str(name).strip().replace(" ", "_").upper()
name = re.sub(r"[^A-Z0-9]", "", name) # Remove special characters
return name
@router.get("/", response_model=List[PuckSchema])
async def get_pucks(db: Session = Depends(get_db)):
return db.query(PuckModel).all()
@router.get("/with-tell-position", response_model=List[dict])
async def get_pucks_with_tell_position(db: Session = Depends(get_db)):
"""
Retrieve all pucks with a `tell_position`
set (not null) and their associated samples.
"""
# Query all pucks that have an event with a non-null tell_position
pucks = (
db.query(PuckModel)
.join(PuckEventModel, PuckModel.id == PuckEventModel.puck_id)
.filter(PuckEventModel.tell_position.isnot(None))
.all()
)
logger.info(f"Pucks with tell position: {pucks}")
if not pucks:
logger.info("No pucks with tell_position found.") # Log for debugging
raise HTTPException(
status_code=404, detail="No pucks with a `tell_position` found."
)
result = []
for puck in pucks:
# Get associated samples for the puck
samples = db.query(SampleModel).filter(SampleModel.puck_id == puck.id).all()
sample_data = [
{
"id": sample.id,
"sample_name": sample.sample_name,
"position": sample.position, # Updated field based on schema
}
for sample in samples
]
# Add puck and sample info to the result
result.append(
{
"id": puck.id,
"puck_name": puck.puck_name,
"puck_type": puck.puck_type,
"puck_location_in_dewar": puck.puck_location_in_dewar,
"dewar_id": puck.dewar_id,
"samples": sample_data, # Add associated samples
}
)
return result
@router.put("/set-tell-positions", status_code=status.HTTP_200_OK)
async def set_tell_positions(
puck_name: str,
segment: str,
puck_in_segment: int,
pucks: List[dict], # Accept a list of puck definitions from the client
db: Session = Depends(get_db),
):
"""
Set the tell position for a puck based on the last beamline event.
- Validates `puck_name`, `segment`, and `puck_in_segment`.
- Finds the most recent logistics event of type "beamline" for the associated dewar.
- Ensures the logistics event is associated with a valid slot.
- Creates a new puck event with the specified `tell_position`.
Set the tell positions for multiple pucks.
Args:
puck_name (str): Name of the puck to set the tell position for.
segment (str): Segment label (A-F).
puck_in_segment (int): Position in the segment (1-5).
pucks (List[dict]): A list of puck definitions, where each puck contains:
- `puck_name` (str): The cleaned name of the puck.
- `segment` (str): The segment in the dewar (e.g., "A-F").
- `puck_in_segment` (int): The position within the segment (1-5).
Returns:
JSON response containing puck and associated information.
List[dict]: A list of responses, one for each successfully processed puck.
"""
results = []
from datetime import datetime
for puck_data in pucks:
try:
# Extract data from input
puck_name = puck_data.get("puckname")
segment = puck_data.get("segment")
puck_in_segment = puck_data.get("puck_in_segment")
# 1. Validate `segment` and `puck_in_segment`
if not segment or segment not in "ABCDEF":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid segment. Valid segments are A, B, C, D, E, F.",
)
if not (1 <= puck_in_segment <= 5):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid puck_in_segment. Valid positions are 1 to 5.",
)
# 1. Validate `segment` and `puck_in_segment`
if not segment or segment not in "ABCDEF":
raise ValueError(
f"Invalid segment '{segment}'. Must be A, B, C, D, E, or F."
)
if not (1 <= puck_in_segment <= 5):
raise ValueError(
f"Invalid puck_in_segment "
f"'{puck_in_segment}'. Must be in range 1-5."
)
# Generate tell_position
tell_position = f"{segment}{puck_in_segment}"
# Generate tell_position
tell_position = f"{segment}{puck_in_segment}"
# 2. Find the puck by its name and ensure it exists
puck = db.query(PuckModel).filter(PuckModel.puck_name == puck_name).first()
if not puck:
print(f"DEBUG: Puck '{puck_name}' not found in the database.")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
# This should match the error returned
detail=f"Puck with name '{puck_name}' not found.",
)
print(f"DEBUG: Found puck: {puck}")
# 2. Find the puck by its cleaned name
normalized_name = normalize_puck_name(puck_name)
# 3. Find the dewar associated with the puck
dewar = db.query(DewarModel).filter(DewarModel.id == puck.dewar_id).first()
if not dewar:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Dewar associated with puck '{puck_name}' not found.",
)
# Use SQLAlchemy to match normalized names
puck = (
db.query(PuckModel)
.filter(
func.replace(func.upper(PuckModel.puck_name), "-", "")
== normalized_name
)
.first()
)
# 4. Find the most recent logistics event for the dewar (type 'beamline')
# and ensure not null
logistics_event = (
db.query(LogisticsEventModel)
.filter(
LogisticsEventModel.dewar_id == dewar.id,
LogisticsEventModel.event_type == "beamline",
)
.order_by(LogisticsEventModel.timestamp.desc())
.first()
)
if not logistics_event:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=(
f"No recent 'beamline' logistics event found for dewar '"
f"{dewar.dewar_name}' "
f"(puck '{puck_name}')."
),
)
if not puck:
raise ValueError(
f"Puck with cleaned name '{puck_name}' not found in the database."
)
# 5. Retrieve the slot from the logistics event
slot = db.query(SlotModel).filter(SlotModel.id == logistics_event.slot_id).first()
if not slot:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=(
f"No slot associated with the most recent 'beamline' logistics event "
f"for dewar '{dewar.dewar_name}'."
),
)
# 3. Find the dewar associated with the puck
dewar = db.query(DewarModel).filter(DewarModel.id == puck.dewar_id).first()
if not dewar:
raise ValueError(f"Dewar associated with puck '{puck_name}' not found.")
# 6. Set the tell position for the puck by creating a PuckEvent
new_puck_event = PuckEventModel(
puck_id=puck.id,
tell_position=tell_position,
event_type="tell_position_set",
timestamp=datetime.utcnow(),
)
db.add(new_puck_event)
db.commit()
db.refresh(new_puck_event)
# 4. Find the most recent logistics event for the dewar (type 'beamline')
logistics_event = (
db.query(LogisticsEventModel)
.filter(
LogisticsEventModel.dewar_id == dewar.id,
LogisticsEventModel.event_type == "beamline",
)
.order_by(LogisticsEventModel.timestamp.desc())
.first()
)
if not logistics_event:
raise ValueError(
f"No recent 'beamline' logistics event found for dewar "
f"'{dewar.dewar_name}'."
)
# 7. Return the result
return {
"puck_id": puck.id,
"puck_name": puck_name,
"dewar_id": dewar.id,
"dewar_name": dewar.dewar_name,
"slot_id": slot.id,
"slot_label": slot.label,
"tell_position": tell_position,
"event_timestamp": new_puck_event.timestamp,
}
# 5. Retrieve the slot from the logistics event
slot = (
db.query(SlotModel)
.filter(SlotModel.id == logistics_event.slot_id)
.first()
)
if not slot:
raise ValueError(
f"No slot associated with the most recent 'beamline' "
f"logistics event "
f"for dewar '{dewar.dewar_name}'."
)
# 6. Create a PuckEvent to set the 'tell_position'
new_puck_event = PuckEventModel(
puck_id=puck.id,
tell_position=tell_position,
event_type="tell_position_set",
timestamp=datetime.utcnow(),
)
db.add(new_puck_event)
db.commit()
db.refresh(new_puck_event)
# Add success result to the results list
results.append(
{
"puck_id": puck.id,
"puck_name": puck.puck_name,
"dewar_id": dewar.id,
"dewar_name": dewar.dewar_name,
"slot_id": slot.id,
"slot_label": slot.label,
"tell_position": tell_position,
"event_timestamp": new_puck_event.timestamp,
}
)
except Exception as e:
# Handle errors for individual pucks and continue processing others
results.append(
{
"puck_name": puck_name,
"error": str(e),
}
)
return results
@router.get("/{puck_id}", response_model=PuckSchema)

View File

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "aareDB"
version = "0.1.0a8"
version = "0.1.0a9"
description = "Backend for next gen sample management system"
authors = [{name = "Guillaume Gotthard", email = "guillaume.gotthard@psi.ch"}]
license = {text = "MIT"}