aaredb/backend/app/routers/beamtime.py
GotthardG 4328b84795 Add beamtime relationships and enhance sample handling
This commit adds relationships to link Pucks and Samples to Beamtime in the models, enabling better data association. Includes changes to assign beamtime IDs during data generation and updates in API response models for improved data loading. Removed redundant code in testfunctions.ipynb to clean up the notebook.
2025-05-06 11:28:36 +02:00

83 lines
2.4 KiB
Python

from fastapi import APIRouter, HTTPException, status, Depends
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import or_
from app.models import Beamtime as BeamtimeModel
from app.schemas import (
Beamtime as BeamtimeSchema,
BeamtimeCreate,
loginData,
BeamtimeResponse,
)
from app.dependencies import get_db
from app.routers.auth import get_current_user
beamtime_router = APIRouter()
@beamtime_router.post("/", response_model=BeamtimeSchema)
async def create_beamtime(
beamtime: BeamtimeCreate,
db: Session = Depends(get_db),
current_user: loginData = Depends(get_current_user),
):
# Validate the pgroup belongs to the current user
if beamtime.pgroups not in current_user.pgroups:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="You do not have permission to create a beamtime for this pgroup.",
)
# Check for existing beamtime for this pgroup, date, and shift
existing = (
db.query(BeamtimeModel)
.filter(
BeamtimeModel.pgroups == beamtime.pgroups,
BeamtimeModel.start_date == beamtime.start_date,
BeamtimeModel.shift == beamtime.shift,
)
.first()
)
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A beamtime for this pgroup/shift/date already exists.",
)
db_beamtime = BeamtimeModel(
pgroups=beamtime.pgroups,
shift=beamtime.shift,
beamtime_name=beamtime.beamtime_name,
beamline=beamtime.beamline,
start_date=beamtime.start_date,
end_date=beamtime.end_date,
status=beamtime.status,
comments=beamtime.comments,
proposal_id=beamtime.proposal_id,
local_contact_id=beamtime.local_contact_id,
)
db.add(db_beamtime)
db.commit()
db.refresh(db_beamtime)
return db_beamtime
@beamtime_router.get(
"/my-beamtimes",
response_model=list[BeamtimeResponse],
)
async def get_my_beamtimes(
db: Session = Depends(get_db),
current_user: loginData = Depends(get_current_user),
):
user_pgroups = current_user.pgroups
filters = [BeamtimeModel.pgroups.like(f"%{pgroup}%") for pgroup in user_pgroups]
beamtimes = (
db.query(BeamtimeModel)
.options(joinedload(BeamtimeModel.local_contact))
.filter(or_(*filters))
.all()
)
return beamtimes