Refactor job model and endpoints for improved structure

Updated the job model to include `sample_id` and `run_id` fields, replacing `experiment_parameters_id`. Adjusted relationships and modified routers to reflect these changes. Added an endpoint for updating job status and restructured job streaming logic to include detailed experiment and sample data.
This commit is contained in:
GotthardG
2025-04-29 14:43:39 +02:00
parent 3eb4050d82
commit 9af2e84f9e
7 changed files with 154 additions and 142 deletions

View File

@ -317,8 +317,9 @@ class Jobs(Base):
__tablename__ = "jobs" __tablename__ = "jobs"
id = Column(Integer, primary_key=True, index=True) id = Column(Integer, primary_key=True, index=True)
experiment_parameters_id = Column(Integer, ForeignKey("experiment_parameters.id")) sample_id = Column(Integer, ForeignKey("samples.id"), nullable=False)
run_id = Column(Integer, ForeignKey("experiment_parameters.id"), nullable=False)
status = Column(String, nullable=False) status = Column(String, nullable=False)
parameters = relationship(ExperimentParameters) experiment_parameters = relationship(ExperimentParameters)
created_at = Column(DateTime, server_default=func.now()) created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, onupdate=func.now()) updated_at = Column(DateTime, onupdate=func.now())

View File

@ -4,7 +4,13 @@ from fastapi import APIRouter, Depends
from fastapi.encoders import jsonable_encoder from fastapi.encoders import jsonable_encoder
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from starlette.responses import StreamingResponse from starlette.responses import StreamingResponse
from app.models import JobStatus, Jobs as JobModel from app.models import (
JobStatus,
Jobs as JobModel,
ExperimentParameters as ExperimentParametersModel,
Sample as SampleModel,
)
from app.schemas import JobsResponse, JobsUpdate
from app.dependencies import get_db from app.dependencies import get_db
router = APIRouter() router = APIRouter()
@ -13,9 +19,37 @@ router = APIRouter()
async def job_event_generator(db: Session): async def job_event_generator(db: Session):
while True: while True:
jobs = db.query(JobModel).filter(JobModel.status == JobStatus.TODO).all() jobs = db.query(JobModel).filter(JobModel.status == JobStatus.TODO).all()
if jobs: job_items = []
job_payload = jsonable_encoder(jobs) for job in jobs:
yield f"data: {json.dumps(job_payload)}\n\n" sample = db.query(SampleModel).filter_by(id=job.sample_id).first()
experiment = (
db.query(ExperimentParametersModel)
.filter(ExperimentParametersModel.sample_id == sample.id)
.first()
)
job_item = JobsResponse(
job_id=job.id,
sample_id=sample.id,
run_id=getattr(experiment, "run_number", None),
sample_name=sample.sample_name,
status=job.status,
type=getattr(job, "type", "default_type"),
created_at=job.created_at,
updated_at=job.updated_at,
data_collection_parameters=sample.data_collection_parameters,
experiment_parameters=experiment.beamline_parameters
if experiment
else None,
)
job_items.append(job_item)
if job_items:
# Use Pydantic's .json() for each item, if you need a fine structure, or:
serialized = jsonable_encoder(job_items)
yield f"data: {json.dumps(serialized)}\n\n"
await asyncio.sleep(5) await asyncio.sleep(5)
@ -25,3 +59,29 @@ async def job_event_generator(db: Session):
@router.get("/jobs/stream") @router.get("/jobs/stream")
async def stream_jobs(db: Session = Depends(get_db)): async def stream_jobs(db: Session = Depends(get_db)):
return StreamingResponse(job_event_generator(db), media_type="text/event-stream") return StreamingResponse(job_event_generator(db), media_type="text/event-stream")
@router.post(
"/jobs/update_status", response_model=JobsUpdate, operation_id="update_status"
)
def update_jobs_status(payload: JobsUpdate, db: Session = Depends(get_db)):
# Fetch the job by job_id
job = db.query(JobModel).filter(JobModel.id == payload.job_id).first()
if not job:
# Optionally, use HTTPException for proper status code
from fastapi import HTTPException
raise HTTPException(status_code=404, detail="Job not found")
# Update the status
job.status = payload.status
# Optionally update 'updated_at'
from sqlalchemy import func
job.updated_at = func.now()
db.commit()
db.refresh(job)
# Return the updated job's info as response
return JobsUpdate(job_id=job.id, status=job.status)

View File

@ -351,8 +351,9 @@ def create_experiment_parameters_for_sample(
db.commit() db.commit()
new_job = JobModel( new_job = JobModel(
experiment_parameters_id=new_exp.id, # <-- Correct reference here sample_id=sample_id,
parameters=new_exp, # assuming params has a to_dict() method run_id=new_exp.id,
experiment_parameters=new_exp, # not sure yet
status=JobStatus.TODO, status=JobStatus.TODO,
) )
db.add(new_job) db.add(new_job)

View File

@ -944,16 +944,32 @@ class ResultResponse(BaseModel):
class JobsCreate(BaseModel): class JobsCreate(BaseModel):
id: str id: int
name: str sample_id: int
run_id: int
sample_name: str
status: str status: str
type: str created_at: datetime
start_time: datetime updated_at: datetime
end_time: datetime experiment_parameters: BeamlineParameters
description: Optional[str]
parameters: str
datacollectionparameters: DataCollectionParameters
beamlineparameters: BeamlineParameters
class Config: class Config:
from_attributes = True from_attributes = True
class JobsResponse(BaseModel):
job_id: int
sample_id: int
run_id: int
sample_name: str
status: str
type: str
created_at: datetime
updated_at: Optional[datetime]
data_collection_parameters: Optional[DataCollectionParameters] = None
experiment_parameters: BeamlineParameters
class JobsUpdate(BaseModel):
job_id: int
status: str

View File

@ -128,8 +128,8 @@ async def lifespan(app: FastAPI):
db = SessionLocal() db = SessionLocal()
try: try:
if environment == "prod": if environment == "prod":
Base.metadata.drop_all(bind=engine) # Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine) # Base.metadata.create_all(bind=engine)
from sqlalchemy.engine import reflection from sqlalchemy.engine import reflection
inspector = reflection.Inspector.from_engine(engine) inspector = reflection.Inspector.from_engine(engine)

View File

@ -9,7 +9,7 @@
"is_executing": true "is_executing": true
}, },
"ExecuteTime": { "ExecuteTime": {
"start_time": "2025-04-29T07:44:02.103530Z" "start_time": "2025-04-29T12:21:15.813326Z"
} }
}, },
"source": [ "source": [
@ -18,13 +18,24 @@
"import json\n", "import json\n",
"\n", "\n",
"SSE_URL = \"https://127.0.0.1:8000/processing/jobs/stream\"\n", "SSE_URL = \"https://127.0.0.1:8000/processing/jobs/stream\"\n",
"UPDATE_URL = \"https://127.0.0.1:8000/processing/jobs/update_status\"\n",
"\n", "\n",
"def submit_job_update(job_id, status):\n",
" payload = {\n",
" \"job_id\": job_id,\n",
" \"status\": status\n",
" }\n",
" try:\n",
" response = requests.post(UPDATE_URL, json=payload, verify=False)\n",
" if response.status_code == 200:\n",
" print(f\"✅ Job {job_id} status updated to '{status}'. Response: {response.json()}\")\n",
" else:\n",
" print(f\"❌ Failed to update job {job_id}. Status: {response.status_code}. Response: {response.text}\")\n",
" except Exception as e:\n",
" print(f\"Failed to submit update for Job {job_id}: {e}\")\n",
"\n", "\n",
"#SSE_URL = \"https://mx-aare-test.psi.ch:1492/processing/jobs/stream\"\n", "def listen_and_update_jobs(url):\n",
"\n", " print(\"Starting job status updater...\")\n",
"\n",
"def listen_and_process_jobs(url):\n",
" print(\"Starting processing pipeline...\")\n",
" with requests.get(url, stream=True, verify=False) as response:\n", " with requests.get(url, stream=True, verify=False) as response:\n",
" if response.status_code != 200:\n", " if response.status_code != 200:\n",
" print(f\"Failed to connect with status code: {response.status_code}\")\n", " print(f\"Failed to connect with status code: {response.status_code}\")\n",
@ -37,69 +48,25 @@
" jobs = json.loads(event.data)\n", " jobs = json.loads(event.data)\n",
" print(f\"Jobs received: {jobs}\")\n", " print(f\"Jobs received: {jobs}\")\n",
"\n", "\n",
" for job in jobs:\n", " #for job in jobs:\n",
" job_id = job.get(\"id\")\n", " # job_id = job.get(\"job_id\")\n",
" parameters = job.get(\"parameters\")\n", " # print(f\"Job ID: {job_id}, Current status: {job.get('status')}\")\n",
" if parameters is None:\n", " # # Immediately update status to \"submitted\"\n",
" print(\n", " # submit_job_update(job_id, \"submitted\")\n",
" f\"⚠️ Job {job_id if job_id is not None else '[unknown id]'} has no 'parameters'; skipping.\")\n",
" continue\n",
"\n",
" print(f\"Processing job ID: {job_id} with parameters: {parameters}\")\n",
"\n",
" # Your custom job-processing logic goes here\n",
" process_job_logic(job_id, parameters)\n",
"\n",
" # After job processing completes, send results & update job status\n",
" submit_job_result(job_id, processing_result={'success': True})\n",
" except json.JSONDecodeError as e:\n", " except json.JSONDecodeError as e:\n",
" print(f\"Error decoding event data: {e}\")\n", " print(f\"Error decoding event data: {e}\")\n",
" except Exception as e:\n", " except Exception as e:\n",
" print(f\"Unexpected error while processing event: {e}\")\n", " print(f\"Unexpected error while processing event: {e}\")\n",
"\n", "\n",
"\n",
"def process_job_logic(job_id, parameters):\n",
" print(f\"📦 {job_id} - Processing detailed logic here: {parameters}\")\n",
" import time\n",
" time.sleep(5)\n",
" print(f\"✅ Job {job_id} processing complete\")\n",
"\n",
"\n",
"def submit_job_result(job_id, processing_result):\n",
" import backend.aareDBclient as aareDBclient\n",
" from aareDBclient.models import ResultCreate\n",
"\n",
" configuration = aareDBclient.Configuration(\n",
" host=\"https://127.0.0.1:8000\",\n",
" verify_ssl=False\n",
" )\n",
"\n",
" with aareDBclient.ApiClient(configuration) as api_client:\n",
" api_instance = aareDBclient.ProcessingApi(api_client)\n",
"\n",
" result = ResultCreate(\n",
" job_id=job_id,\n",
" result=processing_result\n",
" )\n",
" try:\n",
" res = api_instance.submit_processing_result(result_create=result)\n",
" print(f\"Job {job_id} result submitted successfully! Backend Response: {res}\")\n",
" except aareDBclient.rest.ApiException as e:\n",
" print(f\"Failed to submit result for Job {job_id}: {e}\")\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n", "if __name__ == \"__main__\":\n",
" listen_and_process_jobs(SSE_URL)" " listen_and_update_jobs(SSE_URL)\n"
], ],
"outputs": [ "outputs": [
{ {
"name": "stdout", "name": "stdout",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"Starting processing pipeline...\n", "Starting job status updater...\n"
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n"
] ]
}, },
{ {
@ -109,39 +76,6 @@
"/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/urllib3/connectionpool.py:1103: InsecureRequestWarning: Unverified HTTPS request is being made to host '127.0.0.1'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#tls-warnings\n", "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/urllib3/connectionpool.py:1103: InsecureRequestWarning: Unverified HTTPS request is being made to host '127.0.0.1'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#tls-warnings\n",
" warnings.warn(\n" " warnings.warn(\n"
] ]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n",
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n",
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n",
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n",
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n",
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n",
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n",
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n",
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n"
]
} }
], ],
"execution_count": null "execution_count": null

File diff suppressed because one or more lines are too long