aaredb/backend/app/routers/beamtime.py
GotthardG 102a11eed7 Add beamtime functionality to backend.
Introduce new endpoint and model for managing beamtimes, including shifts and user-specific access. Updated test scripts and data to reflect beamtime integration, along with minor fixes for job status enumeration and example notebook refinement.
2025-05-05 16:05:37 +02:00

73 lines
2.3 KiB
Python

from fastapi import APIRouter, HTTPException, status, Depends
from sqlalchemy.orm import Session
from sqlalchemy import or_
from app.models import Beamtime as BeamtimeModel
from app.schemas import Beamtime as BeamtimeSchema, BeamtimeCreate, loginData
from app.dependencies import get_db
from app.routers.auth import get_current_user
beamtime_router = APIRouter()
@beamtime_router.post("/", response_model=BeamtimeSchema)
async def create_beamtime(
beamtime: BeamtimeCreate,
db: Session = Depends(get_db),
current_user: loginData = Depends(get_current_user),
):
# Validate the pgroup belongs to the current user
if beamtime.pgroups not in current_user.pgroups:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="You do not have permission to create a beamtime for this pgroup.",
)
# Check for existing beamtime for this pgroup, date, and shift
existing = (
db.query(BeamtimeModel)
.filter(
BeamtimeModel.pgroups == beamtime.pgroups,
BeamtimeModel.start_date == beamtime.start_date,
BeamtimeModel.shift == beamtime.shift,
)
.first()
)
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A beamtime for this pgroup/shift/date already exists.",
)
db_beamtime = BeamtimeModel(
pgroups=beamtime.pgroups,
shift=beamtime.shift,
beamtime_name=beamtime.beamtime_name,
beamline=beamtime.beamline,
start_date=beamtime.start_date,
end_date=beamtime.end_date,
status=beamtime.status,
comments=beamtime.comments,
proposal_id=beamtime.proposal_id,
local_contact_id=beamtime.local_contact_id,
)
db.add(db_beamtime)
db.commit()
db.refresh(db_beamtime)
return db_beamtime
@beamtime_router.get(
"/my-beamtimes",
response_model=list[BeamtimeSchema],
)
async def get_my_beamtimes(
db: Session = Depends(get_db),
current_user: loginData = Depends(get_current_user),
):
user_pgroups = current_user.pgroups
filters = [BeamtimeModel.pgroups.like(f"%{pgroup}%") for pgroup in user_pgroups]
beamtimes = db.query(BeamtimeModel).filter(or_(*filters)).all()
return beamtimes