Refactor job model and optimize job streaming.

Updated the `JobModel` with foreign key relationships and string-based status to enhance database consistency, and improved job event streaming by using `jsonable_encoder` for better serialization. Also, streamlined dependencies by adding `urllib3` to handle HTTP requests.
This commit is contained in:
GotthardG
2025-04-29 09:47:57 +02:00
parent 866139baea
commit 3eb4050d82
9 changed files with 352 additions and 7909 deletions

View File

@@ -7,7 +7,6 @@ from sqlalchemy import (
JSON,
DateTime,
Boolean,
Enum,
func,
)
from sqlalchemy.orm import relationship
@@ -318,8 +317,8 @@ class Jobs(Base):
__tablename__ = "jobs"
id = Column(Integer, primary_key=True, index=True)
experiment_parameters_id = Column(Integer, nullable=False)
status = Column(Enum(JobStatus), default=JobStatus.TODO, nullable=False)
parameters = Column(JSON, nullable=False)
experiment_parameters_id = Column(Integer, ForeignKey("experiment_parameters.id"))
status = Column(String, nullable=False)
parameters = relationship(ExperimentParameters)
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, onupdate=func.now())

View File

@@ -1,6 +1,7 @@
import json
import asyncio
from fastapi import APIRouter, Depends
from fastapi.encoders import jsonable_encoder
from sqlalchemy.orm import Session
from starlette.responses import StreamingResponse
from app.models import JobStatus, Jobs as JobModel
@@ -11,15 +12,14 @@ router = APIRouter()
async def job_event_generator(db: Session):
while True:
# Fetch jobs with status TODO
jobs = db.query(JobModel).filter(JobModel.status == JobStatus.TODO).all()
if jobs:
# It's recommended to explicitly communicate IDs clearly
job_payload = [{"id": job.id, "parameters": job.parameters} for job in jobs]
job_payload = jsonable_encoder(jobs)
yield f"data: {json.dumps(job_payload)}\n\n"
await asyncio.sleep(5)
await asyncio.sleep(5) # A reasonable heartbeat/refresh
# A reasonable heartbeat/refresh
@router.get("/jobs/stream")

View File

@@ -352,7 +352,7 @@ def create_experiment_parameters_for_sample(
new_job = JobModel(
experiment_parameters_id=new_exp.id, # <-- Correct reference here
parameters=new_exp.to_dict(), # assuming params has a to_dict() method
parameters=new_exp, # assuming params has a to_dict() method
status=JobStatus.TODO,
)
db.add(new_job)

View File

@@ -941,3 +941,19 @@ class ResultResponse(BaseModel):
class Config:
from_attributes = True
class JobsCreate(BaseModel):
id: str
name: str
status: str
type: str
start_time: datetime
end_time: datetime
description: Optional[str]
parameters: str
datacollectionparameters: DataCollectionParameters
beamlineparameters: BeamlineParameters
class Config:
from_attributes = True

171
backend/propipe_sim.ipynb Normal file
View File

@@ -0,0 +1,171 @@
{
"cells": [
{
"cell_type": "code",
"id": "initial_id",
"metadata": {
"collapsed": true,
"jupyter": {
"is_executing": true
},
"ExecuteTime": {
"start_time": "2025-04-29T07:44:02.103530Z"
}
},
"source": [
"import requests\n",
"import sseclient\n",
"import json\n",
"\n",
"SSE_URL = \"https://127.0.0.1:8000/processing/jobs/stream\"\n",
"\n",
"\n",
"#SSE_URL = \"https://mx-aare-test.psi.ch:1492/processing/jobs/stream\"\n",
"\n",
"\n",
"def listen_and_process_jobs(url):\n",
" print(\"Starting processing pipeline...\")\n",
" with requests.get(url, stream=True, verify=False) as response:\n",
" if response.status_code != 200:\n",
" print(f\"Failed to connect with status code: {response.status_code}\")\n",
" return\n",
"\n",
" client = sseclient.SSEClient(response)\n",
"\n",
" for event in client.events():\n",
" try:\n",
" jobs = json.loads(event.data)\n",
" print(f\"Jobs received: {jobs}\")\n",
"\n",
" for job in jobs:\n",
" job_id = job.get(\"id\")\n",
" parameters = job.get(\"parameters\")\n",
" if parameters is None:\n",
" print(\n",
" f\"⚠️ Job {job_id if job_id is not None else '[unknown id]'} has no 'parameters'; skipping.\")\n",
" continue\n",
"\n",
" print(f\"Processing job ID: {job_id} with parameters: {parameters}\")\n",
"\n",
" # Your custom job-processing logic goes here\n",
" process_job_logic(job_id, parameters)\n",
"\n",
" # After job processing completes, send results & update job status\n",
" submit_job_result(job_id, processing_result={'success': True})\n",
" except json.JSONDecodeError as e:\n",
" print(f\"Error decoding event data: {e}\")\n",
" except Exception as e:\n",
" print(f\"Unexpected error while processing event: {e}\")\n",
"\n",
"\n",
"def process_job_logic(job_id, parameters):\n",
" print(f\"📦 {job_id} - Processing detailed logic here: {parameters}\")\n",
" import time\n",
" time.sleep(5)\n",
" print(f\"✅ Job {job_id} processing complete\")\n",
"\n",
"\n",
"def submit_job_result(job_id, processing_result):\n",
" import backend.aareDBclient as aareDBclient\n",
" from aareDBclient.models import ResultCreate\n",
"\n",
" configuration = aareDBclient.Configuration(\n",
" host=\"https://127.0.0.1:8000\",\n",
" verify_ssl=False\n",
" )\n",
"\n",
" with aareDBclient.ApiClient(configuration) as api_client:\n",
" api_instance = aareDBclient.ProcessingApi(api_client)\n",
"\n",
" result = ResultCreate(\n",
" job_id=job_id,\n",
" result=processing_result\n",
" )\n",
" try:\n",
" res = api_instance.submit_processing_result(result_create=result)\n",
" print(f\"Job {job_id} result submitted successfully! Backend Response: {res}\")\n",
" except aareDBclient.rest.ApiException as e:\n",
" print(f\"Failed to submit result for Job {job_id}: {e}\")\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" listen_and_process_jobs(SSE_URL)"
],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Starting processing pipeline...\n",
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/urllib3/connectionpool.py:1103: InsecureRequestWarning: Unverified HTTPS request is being made to host '127.0.0.1'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#tls-warnings\n",
" warnings.warn(\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n",
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n",
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n",
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n",
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n",
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n",
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n",
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n",
"Jobs received: [{'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:40:46.419291', 'id': 1, 'experiment_parameters_id': 1}, {'status': 'todo', 'updated_at': None, 'created_at': '2025-04-29T07:41:44.451862', 'id': 2, 'experiment_parameters_id': 2}]\n",
"⚠️ Job 1 has no 'parameters'; skipping.\n",
"⚠️ Job 2 has no 'parameters'; skipping.\n"
]
}
],
"execution_count": null
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@@ -29,7 +29,8 @@ dependencies = [
"python-dateutil~=2.8.2",
"tomli>=2.0.1",
"python-dotenv",
"psycopg2-binary"
"psycopg2-binary",
"urllib3~=2.2.1"
]
[tool.pytest.ini_options]
norecursedirs = ["backend/python-client"]