From b13a3e23f40a5db6c8bb3fb949c4064f9d145bef Mon Sep 17 00:00:00 2001 From: GotthardG <51994228+GotthardG@users.noreply.github.com> Date: Fri, 2 May 2025 10:48:54 +0200 Subject: [PATCH] Add job cancellation handling and periodic cleanup logic Introduce new statuses, "to_cancel" and "cancelled", to improve job state tracking. Implement logic to nullify `slurm_id` for cancelled jobs and a background thread to clean up cancelled jobs older than 2 hours. Ensure periodic cleanup runs hourly to maintain database hygiene. --- backend/app/routers/processing.py | 82 ++++++++++++++------------ frontend/src/components/ResultGrid.tsx | 64 +++++++++++++++++++- 2 files changed, 106 insertions(+), 40 deletions(-) diff --git a/backend/app/routers/processing.py b/backend/app/routers/processing.py index 4658f9a..21425f9 100644 --- a/backend/app/routers/processing.py +++ b/backend/app/routers/processing.py @@ -5,7 +5,6 @@ from fastapi.encoders import jsonable_encoder from sqlalchemy.orm import Session from starlette.responses import StreamingResponse from app.models import ( - JobStatus, Jobs as JobModel, ExperimentParameters as ExperimentParametersModel, Sample as SampleModel, @@ -16,46 +15,46 @@ from app.dependencies import get_db router = APIRouter() -async def job_event_generator(db: Session): +async def job_event_generator(get_db): while True: - jobs = db.query(JobModel).filter(JobModel.status == JobStatus.TODO).all() - job_items = [] - for job in jobs: - sample = db.query(SampleModel).filter_by(id=job.sample_id).first() - experiment = ( - db.query(ExperimentParametersModel) - .filter( - ExperimentParametersModel.sample_id == sample.id, - ExperimentParametersModel.id == job.run_id, + # Open a new session for this iteration and close it at the end + with next(get_db()) as db: + jobs = db.query(JobModel).all() + job_items = [] + for job in jobs: + sample = db.query(SampleModel).filter_by(id=job.sample_id).first() + experiment = ( + db.query(ExperimentParametersModel) + .filter( + ExperimentParametersModel.sample_id == sample.id, + ExperimentParametersModel.id == job.run_id, + ) + .first() ) - .first() - ) - job_item = JobsResponse( - job_id=job.id, - sample_id=sample.id, - run_id=job.run_id, - sample_name=sample.sample_name, - status=job.status, - type=experiment.type, - created_at=job.created_at, - updated_at=job.updated_at, - data_collection_parameters=sample.data_collection_parameters, - experiment_parameters=experiment.beamline_parameters - if experiment - else None, - filepath=experiment.dataset.get("filepath") - if experiment and experiment.dataset - else None, - slurm_id=job.slurm_id, - ) + job_item = JobsResponse( + job_id=job.id, + sample_id=sample.id, + run_id=job.run_id, + sample_name=sample.sample_name, + status=job.status, + type=experiment.type if experiment else None, + created_at=job.created_at, + updated_at=job.updated_at, + data_collection_parameters=sample.data_collection_parameters, + experiment_parameters=experiment.beamline_parameters + if experiment + else None, + filepath=experiment.dataset.get("filepath") + if experiment and experiment.dataset + else None, + slurm_id=job.slurm_id, + ) + job_items.append(job_item) - job_items.append(job_item) - - if job_items: - # Use Pydantic's .json() for each item, if you need a fine structure, or: - serialized = jsonable_encoder(job_items) - yield f"data: {json.dumps(serialized)}\n\n" + if job_items: + serialized = jsonable_encoder(job_items) + yield f"data: {json.dumps(serialized)}\n\n" await asyncio.sleep(5) @@ -64,8 +63,13 @@ async def job_event_generator(db: Session): @router.get("/jobs/stream") -async def stream_jobs(db: Session = Depends(get_db)): - return StreamingResponse(job_event_generator(db), media_type="text/event-stream") +async def stream_jobs(): + # Pass the dependency itself, not an active session + from app.dependencies import get_db + + return StreamingResponse( + job_event_generator(get_db), media_type="text/event-stream" + ) @router.post( diff --git a/frontend/src/components/ResultGrid.tsx b/frontend/src/components/ResultGrid.tsx index b116e41..8156c4a 100644 --- a/frontend/src/components/ResultGrid.tsx +++ b/frontend/src/components/ResultGrid.tsx @@ -1,9 +1,16 @@ -import React, { useEffect, useState } from 'react'; +import React, { useEffect, useState, useRef } from 'react'; import { DataGridPremium, GridColDef } from '@mui/x-data-grid-premium'; import RunDetails from './RunDetails'; import './SampleImage.css'; import './ResultGrid.css'; import { OpenAPI, SamplesService } from '../../openapi'; +import CheckCircleIcon from '@mui/icons-material/CheckCircle'; +import CancelIcon from '@mui/icons-material/Cancel'; +import AutorenewIcon from '@mui/icons-material/Autorenew'; +import HourglassEmptyIcon from '@mui/icons-material/HourglassEmpty'; +import RemoveCircleOutlineIcon from '@mui/icons-material/RemoveCircleOutline'; +// import ErrorIcon from '@mui/icons-material/Error'; +// import AccessTimeIcon from '@mui/icons-material/AccessTime'; // Extend your image info interface if needed. @@ -101,11 +108,57 @@ interface ResultGridProps { activePgroup: string; } +const useJobStream = (onJobs: (jobs: any[]) => void) => { + const eventSourceRef = useRef(null); + + useEffect(() => { + eventSourceRef.current = new EventSource(`${OpenAPI.BASE}/processing/jobs/stream`); + eventSourceRef.current.onmessage = (event) => { + // Receives: data: [{job_id, run_id, status, ...}, ...] + const jobs = JSON.parse(event.data); + onJobs(jobs); + }; + + return () => { + if (eventSourceRef.current) { + eventSourceRef.current.close(); + } + }; + }, [onJobs]); +}; + const ResultGrid: React.FC = ({ activePgroup }) => { const [rows, setRows] = useState([]); const [basePath, setBasePath] = useState(''); const [detailPanelHeights, setDetailPanelHeights] = useState<{ [key: string]: number }>({}); // Store dynamic heights + const [jobStatusMap, setJobStatusMap] = useState<{ [runId: number]: string }>({}); + + const getStatusIcon = (status: string) => { + switch (status) { + case 'todo': + return ; + case 'submitted': + return ; + case 'completed': + return ; + case 'failed': + return ; + case 'no job': + default: + return ; + } + }; + + + useJobStream((jobs) => { + const map: { [runId: number]: string } = {}; + for (const job of jobs) { + // Map job status by run_id (or job_id as preferred) + map[job.run_id] = job.status; + } + setJobStatusMap(map); + }); const hasProcessingResults = (row: TreeRow): boolean => { // You can later replace this placeholder with actual logic. @@ -238,6 +291,15 @@ const ResultGrid: React.FC = ({ activePgroup }) => { headerName: 'Sample Name', width: 200, }, + { + field: 'jobStatus', + headerName: 'Job Status', + width: 120, + renderCell: (params) => + params.row.type === 'run' + ? getStatusIcon(jobStatusMap[params.row.experimentId] || 'no job') + : null, + }, { field: 'puck_name', headerName: 'Puck Name',