import json import asyncio from fastapi import APIRouter, Depends from fastapi.encoders import jsonable_encoder from sqlalchemy.orm import Session from starlette.responses import StreamingResponse from app.models import ( JobStatus, Jobs as JobModel, ExperimentParameters as ExperimentParametersModel, Sample as SampleModel, ) from app.schemas import JobsResponse, JobsUpdate from app.dependencies import get_db router = APIRouter() async def job_event_generator(db: Session): while True: jobs = db.query(JobModel).filter(JobModel.status == JobStatus.TODO).all() job_items = [] for job in jobs: sample = db.query(SampleModel).filter_by(id=job.sample_id).first() experiment = ( db.query(ExperimentParametersModel) .filter( ExperimentParametersModel.sample_id == sample.id, ExperimentParametersModel.id == job.run_id, ) .first() ) job_item = JobsResponse( job_id=job.id, sample_id=sample.id, run_id=job.run_id, sample_name=sample.sample_name, status=job.status, type=experiment.type, created_at=job.created_at, updated_at=job.updated_at, data_collection_parameters=sample.data_collection_parameters, experiment_parameters=experiment.beamline_parameters if experiment else None, filepath=experiment.dataset.get("filepath") if experiment and experiment.dataset else None, slurm_id=job.slurm_id, ) job_items.append(job_item) if job_items: # Use Pydantic's .json() for each item, if you need a fine structure, or: serialized = jsonable_encoder(job_items) yield f"data: {json.dumps(serialized)}\n\n" await asyncio.sleep(5) # A reasonable heartbeat/refresh @router.get("/jobs/stream") async def stream_jobs(db: Session = Depends(get_db)): return StreamingResponse(job_event_generator(db), media_type="text/event-stream") @router.post( "/jobs/update_status", response_model=JobsUpdate, operation_id="update_status" ) def update_jobs_status(payload: JobsUpdate, db: Session = Depends(get_db)): # Fetch the job by job_id job = db.query(JobModel).filter(JobModel.id == payload.job_id).first() if not job: # Optionally, use HTTPException for proper status code from fastapi import HTTPException raise HTTPException(status_code=404, detail="Job not found") # Update the status job.status = payload.status job.slurm_id = payload.slurm_id # Optionally update 'updated_at' from datetime import datetime job.updated_at = datetime.now() db.commit() db.refresh(job) # Return the updated job's info as response return JobsUpdate(job_id=job.id, status=job.status, slurm_id=job.slurm_id)