aaredb/backend/app/routers/processing.py
GotthardG b13a3e23f4 Add job cancellation handling and periodic cleanup logic
Introduce new statuses, "to_cancel" and "cancelled", to improve job state tracking. Implement logic to nullify `slurm_id` for cancelled jobs and a background thread to clean up cancelled jobs older than 2 hours. Ensure periodic cleanup runs hourly to maintain database hygiene.
2025-05-02 10:48:54 +02:00

124 lines
3.9 KiB
Python

import json
import asyncio
from fastapi import APIRouter, Depends
from fastapi.encoders import jsonable_encoder
from sqlalchemy.orm import Session
from starlette.responses import StreamingResponse
from app.models import (
Jobs as JobModel,
ExperimentParameters as ExperimentParametersModel,
Sample as SampleModel,
)
from app.schemas import JobsResponse, JobsUpdate
from app.dependencies import get_db
router = APIRouter()
async def job_event_generator(get_db):
while True:
# Open a new session for this iteration and close it at the end
with next(get_db()) as db:
jobs = db.query(JobModel).all()
job_items = []
for job in jobs:
sample = db.query(SampleModel).filter_by(id=job.sample_id).first()
experiment = (
db.query(ExperimentParametersModel)
.filter(
ExperimentParametersModel.sample_id == sample.id,
ExperimentParametersModel.id == job.run_id,
)
.first()
)
job_item = JobsResponse(
job_id=job.id,
sample_id=sample.id,
run_id=job.run_id,
sample_name=sample.sample_name,
status=job.status,
type=experiment.type if experiment else None,
created_at=job.created_at,
updated_at=job.updated_at,
data_collection_parameters=sample.data_collection_parameters,
experiment_parameters=experiment.beamline_parameters
if experiment
else None,
filepath=experiment.dataset.get("filepath")
if experiment and experiment.dataset
else None,
slurm_id=job.slurm_id,
)
job_items.append(job_item)
if job_items:
serialized = jsonable_encoder(job_items)
yield f"data: {json.dumps(serialized)}\n\n"
await asyncio.sleep(5)
# A reasonable heartbeat/refresh
@router.get("/jobs/stream")
async def stream_jobs():
# Pass the dependency itself, not an active session
from app.dependencies import get_db
return StreamingResponse(
job_event_generator(get_db), media_type="text/event-stream"
)
@router.post(
"/jobs/update_status", response_model=JobsUpdate, operation_id="update_status"
)
def update_jobs_status(payload: JobsUpdate, db: Session = Depends(get_db)):
# Fetch the job by job_id
job = db.query(JobModel).filter(JobModel.id == payload.job_id).first()
if not job:
# Optionally, use HTTPException for proper status code
from fastapi import HTTPException
raise HTTPException(status_code=404, detail="Job not found")
# If status is being updated to "cancelled"
if payload.status == "cancelled":
job.slurm_id = None
# Update the status
job.status = payload.status
job.slurm_id = payload.slurm_id
# Optionally update 'updated_at'
from datetime import datetime
job.updated_at = datetime.now()
db.commit()
db.refresh(job)
# Return the updated job's info as response
return JobsUpdate(job_id=job.id, status=job.status, slurm_id=job.slurm_id)
def cleanup_cancelled_jobs(db: Session):
from datetime import datetime
from datetime import timedelta
"""Delete jobs in 'cancelled' state for more than 2 hours."""
cutoff = datetime.now() - timedelta(hours=2)
print(
f"Cleaning up cancelled jobs older than {cutoff} "
f"(current time: {datetime.now()})"
)
old_jobs = (
db.query(JobModel)
.filter(JobModel.status == "cancelled", JobModel.updated_at < cutoff)
.all()
)
for job in old_jobs:
db.delete(job)
db.commit()