Add job cancellation handling and periodic cleanup logic
Introduce new statuses, "to_cancel" and "cancelled", to improve job state tracking. Implement logic to nullify `slurm_id` for cancelled jobs and a background thread to clean up cancelled jobs older than 2 hours. Ensure periodic cleanup runs hourly to maintain database hygiene.
This commit is contained in:
parent
9e875c5a04
commit
a1b857b78a
@ -313,6 +313,8 @@ class JobStatus(str, enum.Enum):
|
|||||||
TODO = "todo"
|
TODO = "todo"
|
||||||
SUBMITTED = "submitted"
|
SUBMITTED = "submitted"
|
||||||
DONE = "done"
|
DONE = "done"
|
||||||
|
TO_CANCEL = "to_cancel"
|
||||||
|
CANCELLED = "cancelled"
|
||||||
FAILED = "failed"
|
FAILED = "failed"
|
||||||
|
|
||||||
|
|
||||||
|
@ -80,6 +80,10 @@ def update_jobs_status(payload: JobsUpdate, db: Session = Depends(get_db)):
|
|||||||
|
|
||||||
raise HTTPException(status_code=404, detail="Job not found")
|
raise HTTPException(status_code=404, detail="Job not found")
|
||||||
|
|
||||||
|
# If status is being updated to "cancelled"
|
||||||
|
if payload.status == "cancelled":
|
||||||
|
job.slurm_id = None
|
||||||
|
|
||||||
# Update the status
|
# Update the status
|
||||||
job.status = payload.status
|
job.status = payload.status
|
||||||
job.slurm_id = payload.slurm_id
|
job.slurm_id = payload.slurm_id
|
||||||
@ -93,3 +97,23 @@ def update_jobs_status(payload: JobsUpdate, db: Session = Depends(get_db)):
|
|||||||
|
|
||||||
# Return the updated job's info as response
|
# Return the updated job's info as response
|
||||||
return JobsUpdate(job_id=job.id, status=job.status, slurm_id=job.slurm_id)
|
return JobsUpdate(job_id=job.id, status=job.status, slurm_id=job.slurm_id)
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_cancelled_jobs(db: Session):
|
||||||
|
from datetime import datetime
|
||||||
|
from datetime import timedelta
|
||||||
|
|
||||||
|
"""Delete jobs in 'cancelled' state for more than 2 hours."""
|
||||||
|
cutoff = datetime.now() - timedelta(hours=2)
|
||||||
|
print(
|
||||||
|
f"Cleaning up cancelled jobs older than {cutoff} "
|
||||||
|
f"(current time: {datetime.now()})"
|
||||||
|
)
|
||||||
|
old_jobs = (
|
||||||
|
db.query(JobModel)
|
||||||
|
.filter(JobModel.status == "cancelled", JobModel.updated_at < cutoff)
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
for job in old_jobs:
|
||||||
|
db.delete(job)
|
||||||
|
db.commit()
|
||||||
|
@ -122,6 +122,20 @@ if environment == "dev":
|
|||||||
ssl_heidi.generate_self_signed_cert(cert_path, key_path)
|
ssl_heidi.generate_self_signed_cert(cert_path, key_path)
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_job_loop():
|
||||||
|
import time
|
||||||
|
from app.dependencies import get_db
|
||||||
|
from app.routers.processing import cleanup_cancelled_jobs
|
||||||
|
|
||||||
|
while True:
|
||||||
|
db = next(get_db())
|
||||||
|
try:
|
||||||
|
cleanup_cancelled_jobs(db)
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
time.sleep(3600) # every hour
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
print("[INFO] Running application startup tasks...")
|
print("[INFO] Running application startup tasks...")
|
||||||
@ -174,6 +188,12 @@ async def lifespan(app: FastAPI):
|
|||||||
|
|
||||||
load_slots_data(db)
|
load_slots_data(db)
|
||||||
|
|
||||||
|
from threading import Thread
|
||||||
|
|
||||||
|
# Start cleanup in background thread
|
||||||
|
thread = Thread(target=cleanup_job_loop, daemon=True)
|
||||||
|
thread.start()
|
||||||
|
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
db.close()
|
db.close()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user