
Enhanced the models with new fields: a dataset field for Experiment Parameters and a slurm_id for Jobs. Introduced a FAILED status for the JobStatus enum. Updated functionality to handle datasets and trigger job creation based on dataset status.
90 lines
2.8 KiB
Python
90 lines
2.8 KiB
Python
import json
|
|
import asyncio
|
|
from fastapi import APIRouter, Depends
|
|
from fastapi.encoders import jsonable_encoder
|
|
from sqlalchemy.orm import Session
|
|
from starlette.responses import StreamingResponse
|
|
from app.models import (
|
|
JobStatus,
|
|
Jobs as JobModel,
|
|
ExperimentParameters as ExperimentParametersModel,
|
|
Sample as SampleModel,
|
|
)
|
|
from app.schemas import JobsResponse, JobsUpdate
|
|
from app.dependencies import get_db
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
async def job_event_generator(db: Session):
|
|
while True:
|
|
jobs = db.query(JobModel).filter(JobModel.status == JobStatus.TODO).all()
|
|
job_items = []
|
|
for job in jobs:
|
|
sample = db.query(SampleModel).filter_by(id=job.sample_id).first()
|
|
experiment = (
|
|
db.query(ExperimentParametersModel)
|
|
.filter(ExperimentParametersModel.sample_id == sample.id)
|
|
.first()
|
|
)
|
|
|
|
job_item = JobsResponse(
|
|
job_id=job.id,
|
|
sample_id=sample.id,
|
|
run_id=getattr(experiment, "run_number", None),
|
|
sample_name=sample.sample_name,
|
|
status=job.status,
|
|
type=getattr(job, "type", "default_type"),
|
|
created_at=job.created_at,
|
|
updated_at=job.updated_at,
|
|
data_collection_parameters=sample.data_collection_parameters,
|
|
experiment_parameters=experiment.beamline_parameters
|
|
if experiment
|
|
else None,
|
|
slurm_id=None,
|
|
)
|
|
|
|
job_items.append(job_item)
|
|
|
|
if job_items:
|
|
# Use Pydantic's .json() for each item, if you need a fine structure, or:
|
|
serialized = jsonable_encoder(job_items)
|
|
yield f"data: {json.dumps(serialized)}\n\n"
|
|
|
|
await asyncio.sleep(5)
|
|
|
|
|
|
# A reasonable heartbeat/refresh
|
|
|
|
|
|
@router.get("/jobs/stream")
|
|
async def stream_jobs(db: Session = Depends(get_db)):
|
|
return StreamingResponse(job_event_generator(db), media_type="text/event-stream")
|
|
|
|
|
|
@router.post(
|
|
"/jobs/update_status", response_model=JobsUpdate, operation_id="update_status"
|
|
)
|
|
def update_jobs_status(payload: JobsUpdate, db: Session = Depends(get_db)):
|
|
# Fetch the job by job_id
|
|
job = db.query(JobModel).filter(JobModel.id == payload.job_id).first()
|
|
if not job:
|
|
# Optionally, use HTTPException for proper status code
|
|
from fastapi import HTTPException
|
|
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
# Update the status
|
|
job.status = payload.status
|
|
job.slurm_id = payload.slurm_id
|
|
# Optionally update 'updated_at'
|
|
from datetime import datetime
|
|
|
|
job.updated_at = datetime.now()
|
|
|
|
db.commit()
|
|
db.refresh(job)
|
|
|
|
# Return the updated job's info as response
|
|
return JobsUpdate(job_id=job.id, status=job.status, slurm_id=job.slurm_id)
|