Added a test function for SSE

This commit is contained in:
GotthardG 2025-04-11 13:59:46 +02:00
parent 2e6d06018c
commit 866139baea
2 changed files with 130 additions and 7 deletions

View File

@ -1,25 +1,27 @@
import json
import asyncio
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from starlette.responses import StreamingResponse
import asyncio
import json
from app.models import JobStatus, Jobs as JobModel
from app.dependencies import get_db
router = APIRouter()
async def job_event_generator(db):
async def job_event_generator(db: Session):
while True:
# Fetch jobs with status TODO
jobs = db.query(JobModel).filter(JobModel.status == JobStatus.TODO).all()
if jobs:
yield json.dumps([job.parameters for job in jobs]) + "\n\n"
await asyncio.sleep(5)
# It's recommended to explicitly communicate IDs clearly
job_payload = [{"id": job.id, "parameters": job.parameters} for job in jobs]
yield f"data: {json.dumps(job_payload)}\n\n"
await asyncio.sleep(5) # A reasonable heartbeat/refresh
@router.get("/jobs/stream")
async def stream_jobs(db: Session = Depends(get_db)):
# test
return StreamingResponse(job_event_generator(db), media_type="text/event-stream")

View File

@ -935,6 +935,127 @@
}
],
"execution_count": 6
},
{
"metadata": {
"ExecuteTime": {
"end_time": "2025-04-11T11:57:22.504015Z",
"start_time": "2025-04-11T11:56:30.723778Z"
}
},
"cell_type": "code",
"source": [
"import requests\n",
"import sseclient\n",
"import json\n",
"\n",
"SSE_URL = \"https://127.0.0.1:8000/processing/jobs/stream\" # Replace clearly according to backend\n",
"\n",
"\n",
"def listen_and_process_jobs(url):\n",
" print(\"Starting processing pipeline...\")\n",
" with requests.get(url, stream=True, verify=False) as response:\n",
" if response.status_code != 200:\n",
" print(f\"Failed to connect with status code: {response.status_code}\")\n",
" return\n",
"\n",
" client = sseclient.SSEClient(response)\n",
"\n",
" for event in client.events():\n",
" try:\n",
" jobs = json.loads(event.data)\n",
" print(f\"Jobs received: {jobs}\")\n",
"\n",
" for job in jobs:\n",
" job_id = job[\"id\"]\n",
" parameters = job[\"parameters\"]\n",
" print(f\"Processing job ID: {job_id} with parameters: {parameters}\")\n",
"\n",
" # TODO: your custom job-processing logic clearly goes here\n",
" process_job_logic(job_id, parameters)\n",
"\n",
" # After job processing completes, send results & update job status\n",
" submit_job_result(job_id, processing_result={'success': True})\n",
" except json.JSONDecodeError as e:\n",
" print(f\"Error decoding event data: {e}\")\n",
"\n",
"\n",
"def process_job_logic(job_id, parameters):\n",
" # Here: insert your application-specific processing logic\n",
" print(f\"📦 {job_id} - Processing detailed logic clearly here: {parameters}\")\n",
" # simulate execution:\n",
" import time\n",
" time.sleep(5) # Simulating task clearly\n",
" print(f\"✅ Job {job_id} processing complete\")\n",
"\n",
"\n",
"def submit_job_result(job_id, processing_result):\n",
" # Using autogenerated client or direct HTTP calls clearly:\n",
" import backend.aareDBclient as aareDBclient\n",
" from aareDBclient.models import ResultCreate\n",
"\n",
" configuration = aareDBclient.Configuration(\n",
" host=\"https://127.0.0.1:8000\",\n",
" verify_ssl=False\n",
" )\n",
"\n",
" with aareDBclient.ApiClient(configuration) as api_client:\n",
" api_instance = aareDBclient.ProcessingApi(api_client) # assuming this exists!\n",
"\n",
" result = ResultCreate(\n",
" job_id=job_id, # Assuming you have such a field clearly\n",
" result=processing_result\n",
" )\n",
" try:\n",
" res = api_instance.submit_processing_result(result_create=result)\n",
" print(f\"Job {job_id} result submitted successfully! Backend Response: {res}\")\n",
" except aareDBclient.rest.ApiException as e:\n",
" print(f\"Failed to submit result for Job {job_id}: {e}\")\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" listen_and_process_jobs(SSE_URL)\n"
],
"id": "aef43f1265a23cb1",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Starting processing pipeline...\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Users/gotthardg/PycharmProjects/aaredb/.venv/lib/python3.12/site-packages/urllib3/connectionpool.py:1097: InsecureRequestWarning: Unverified HTTPS request is being made to host '127.0.0.1'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#tls-warnings\n",
" warnings.warn(\n"
]
},
{
"ename": "KeyboardInterrupt",
"evalue": "",
"output_type": "error",
"traceback": [
"\u001B[31m---------------------------------------------------------------------------\u001B[39m",
"\u001B[31mKeyboardInterrupt\u001B[39m Traceback (most recent call last)",
"\u001B[36mCell\u001B[39m\u001B[36m \u001B[39m\u001B[32mIn[3]\u001B[39m\u001B[32m, line 70\u001B[39m\n\u001B[32m 66\u001B[39m \u001B[38;5;28mprint\u001B[39m(\u001B[33mf\u001B[39m\u001B[33m\"\u001B[39m\u001B[33mFailed to submit result for Job \u001B[39m\u001B[38;5;132;01m{\u001B[39;00mjob_id\u001B[38;5;132;01m}\u001B[39;00m\u001B[33m: \u001B[39m\u001B[38;5;132;01m{\u001B[39;00me\u001B[38;5;132;01m}\u001B[39;00m\u001B[33m\"\u001B[39m)\n\u001B[32m 69\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[34m__name__\u001B[39m == \u001B[33m\"\u001B[39m\u001B[33m__main__\u001B[39m\u001B[33m\"\u001B[39m:\n\u001B[32m---> \u001B[39m\u001B[32m70\u001B[39m \u001B[43mlisten_and_process_jobs\u001B[49m\u001B[43m(\u001B[49m\u001B[43mSSE_URL\u001B[49m\u001B[43m)\u001B[49m\n",
"\u001B[36mCell\u001B[39m\u001B[36m \u001B[39m\u001B[32mIn[3]\u001B[39m\u001B[32m, line 17\u001B[39m, in \u001B[36mlisten_and_process_jobs\u001B[39m\u001B[34m(url)\u001B[39m\n\u001B[32m 13\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m\n\u001B[32m 15\u001B[39m client = sseclient.SSEClient(response)\n\u001B[32m---> \u001B[39m\u001B[32m17\u001B[39m \u001B[43m\u001B[49m\u001B[38;5;28;43;01mfor\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mevent\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;129;43;01min\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mclient\u001B[49m\u001B[43m.\u001B[49m\u001B[43mevents\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m\u001B[43m:\u001B[49m\n\u001B[32m 18\u001B[39m \u001B[43m \u001B[49m\u001B[38;5;28;43;01mtry\u001B[39;49;00m\u001B[43m:\u001B[49m\n\u001B[32m 19\u001B[39m \u001B[43m \u001B[49m\u001B[43mjobs\u001B[49m\u001B[43m \u001B[49m\u001B[43m=\u001B[49m\u001B[43m \u001B[49m\u001B[43mjson\u001B[49m\u001B[43m.\u001B[49m\u001B[43mloads\u001B[49m\u001B[43m(\u001B[49m\u001B[43mevent\u001B[49m\u001B[43m.\u001B[49m\u001B[43mdata\u001B[49m\u001B[43m)\u001B[49m\n",
"\u001B[36mFile \u001B[39m\u001B[32m~/PycharmProjects/aaredb/.venv/lib/python3.12/site-packages/sseclient/__init__.py:55\u001B[39m, in \u001B[36mSSEClient.events\u001B[39m\u001B[34m(self)\u001B[39m\n\u001B[32m 54\u001B[39m \u001B[38;5;28;01mdef\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34mevents\u001B[39m(\u001B[38;5;28mself\u001B[39m):\n\u001B[32m---> \u001B[39m\u001B[32m55\u001B[39m \u001B[43m \u001B[49m\u001B[38;5;28;43;01mfor\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mchunk\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;129;43;01min\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43m_read\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m\u001B[43m:\u001B[49m\n\u001B[32m 56\u001B[39m \u001B[43m \u001B[49m\u001B[43mevent\u001B[49m\u001B[43m \u001B[49m\u001B[43m=\u001B[49m\u001B[43m \u001B[49m\u001B[43mEvent\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 57\u001B[39m \u001B[43m \u001B[49m\u001B[38;5;66;43;03m# Split before decoding so splitlines() only uses \\r and \\n\u001B[39;49;00m\n",
"\u001B[36mFile \u001B[39m\u001B[32m~/PycharmProjects/aaredb/.venv/lib/python3.12/site-packages/sseclient/__init__.py:45\u001B[39m, in \u001B[36mSSEClient._read\u001B[39m\u001B[34m(self)\u001B[39m\n\u001B[32m 38\u001B[39m \u001B[38;5;250m\u001B[39m\u001B[33;03m\"\"\"Read the incoming event source stream and yield event chunks.\u001B[39;00m\n\u001B[32m 39\u001B[39m \n\u001B[32m 40\u001B[39m \u001B[33;03mUnfortunately it is possible for some servers to decide to break an\u001B[39;00m\n\u001B[32m 41\u001B[39m \u001B[33;03mevent into multiple HTTP chunks in the response. It is thus necessary\u001B[39;00m\n\u001B[32m 42\u001B[39m \u001B[33;03mto correctly stitch together consecutive response chunks and find the\u001B[39;00m\n\u001B[32m 43\u001B[39m \u001B[33;03mSSE delimiter (empty new line) to yield full, correct event chunks.\"\"\"\u001B[39;00m\n\u001B[32m 44\u001B[39m data = \u001B[33mb\u001B[39m\u001B[33m'\u001B[39m\u001B[33m'\u001B[39m\n\u001B[32m---> \u001B[39m\u001B[32m45\u001B[39m \u001B[43m\u001B[49m\u001B[38;5;28;43;01mfor\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mchunk\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;129;43;01min\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43m_event_source\u001B[49m\u001B[43m:\u001B[49m\n\u001B[32m 46\u001B[39m \u001B[43m \u001B[49m\u001B[38;5;28;43;01mfor\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mline\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;129;43;01min\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mchunk\u001B[49m\u001B[43m.\u001B[49m\u001B[43msplitlines\u001B[49m\u001B[43m(\u001B[49m\u001B[38;5;28;43;01mTrue\u001B[39;49;00m\u001B[43m)\u001B[49m\u001B[43m:\u001B[49m\n\u001B[32m 47\u001B[39m \u001B[43m \u001B[49m\u001B[43mdata\u001B[49m\u001B[43m \u001B[49m\u001B[43m+\u001B[49m\u001B[43m=\u001B[49m\u001B[43m \u001B[49m\u001B[43mline\u001B[49m\n",
"\u001B[36mFile \u001B[39m\u001B[32m~/PycharmProjects/aaredb/.venv/lib/python3.12/site-packages/requests/models.py:820\u001B[39m, in \u001B[36mResponse.iter_content.<locals>.generate\u001B[39m\u001B[34m()\u001B[39m\n\u001B[32m 818\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;28mhasattr\u001B[39m(\u001B[38;5;28mself\u001B[39m.raw, \u001B[33m\"\u001B[39m\u001B[33mstream\u001B[39m\u001B[33m\"\u001B[39m):\n\u001B[32m 819\u001B[39m \u001B[38;5;28;01mtry\u001B[39;00m:\n\u001B[32m--> \u001B[39m\u001B[32m820\u001B[39m \u001B[38;5;28;01myield from\u001B[39;00m \u001B[38;5;28mself\u001B[39m.raw.stream(chunk_size, decode_content=\u001B[38;5;28;01mTrue\u001B[39;00m)\n\u001B[32m 821\u001B[39m \u001B[38;5;28;01mexcept\u001B[39;00m ProtocolError \u001B[38;5;28;01mas\u001B[39;00m e:\n\u001B[32m 822\u001B[39m \u001B[38;5;28;01mraise\u001B[39;00m ChunkedEncodingError(e)\n",
"\u001B[36mFile \u001B[39m\u001B[32m~/PycharmProjects/aaredb/.venv/lib/python3.12/site-packages/urllib3/response.py:1063\u001B[39m, in \u001B[36mHTTPResponse.stream\u001B[39m\u001B[34m(self, amt, decode_content)\u001B[39m\n\u001B[32m 1047\u001B[39m \u001B[38;5;250m\u001B[39m\u001B[33;03m\"\"\"\u001B[39;00m\n\u001B[32m 1048\u001B[39m \u001B[33;03mA generator wrapper for the read() method. A call will block until\u001B[39;00m\n\u001B[32m 1049\u001B[39m \u001B[33;03m``amt`` bytes have been read from the connection or until the\u001B[39;00m\n\u001B[32m (...)\u001B[39m\u001B[32m 1060\u001B[39m \u001B[33;03m 'content-encoding' header.\u001B[39;00m\n\u001B[32m 1061\u001B[39m \u001B[33;03m\"\"\"\u001B[39;00m\n\u001B[32m 1062\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;28mself\u001B[39m.chunked \u001B[38;5;129;01mand\u001B[39;00m \u001B[38;5;28mself\u001B[39m.supports_chunked_reads():\n\u001B[32m-> \u001B[39m\u001B[32m1063\u001B[39m \u001B[38;5;28;01myield from\u001B[39;00m \u001B[38;5;28mself\u001B[39m.read_chunked(amt, decode_content=decode_content)\n\u001B[32m 1064\u001B[39m \u001B[38;5;28;01melse\u001B[39;00m:\n\u001B[32m 1065\u001B[39m \u001B[38;5;28;01mwhile\u001B[39;00m \u001B[38;5;129;01mnot\u001B[39;00m is_fp_closed(\u001B[38;5;28mself\u001B[39m._fp) \u001B[38;5;129;01mor\u001B[39;00m \u001B[38;5;28mlen\u001B[39m(\u001B[38;5;28mself\u001B[39m._decoded_buffer) > \u001B[32m0\u001B[39m:\n",
"\u001B[36mFile \u001B[39m\u001B[32m~/PycharmProjects/aaredb/.venv/lib/python3.12/site-packages/urllib3/response.py:1219\u001B[39m, in \u001B[36mHTTPResponse.read_chunked\u001B[39m\u001B[34m(self, amt, decode_content)\u001B[39m\n\u001B[32m 1216\u001B[39m amt = \u001B[38;5;28;01mNone\u001B[39;00m\n\u001B[32m 1218\u001B[39m \u001B[38;5;28;01mwhile\u001B[39;00m \u001B[38;5;28;01mTrue\u001B[39;00m:\n\u001B[32m-> \u001B[39m\u001B[32m1219\u001B[39m \u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43m_update_chunk_length\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 1220\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;28mself\u001B[39m.chunk_left == \u001B[32m0\u001B[39m:\n\u001B[32m 1221\u001B[39m \u001B[38;5;28;01mbreak\u001B[39;00m\n",
"\u001B[36mFile \u001B[39m\u001B[32m~/PycharmProjects/aaredb/.venv/lib/python3.12/site-packages/urllib3/response.py:1138\u001B[39m, in \u001B[36mHTTPResponse._update_chunk_length\u001B[39m\u001B[34m(self)\u001B[39m\n\u001B[32m 1136\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;28mself\u001B[39m.chunk_left \u001B[38;5;129;01mis\u001B[39;00m \u001B[38;5;129;01mnot\u001B[39;00m \u001B[38;5;28;01mNone\u001B[39;00m:\n\u001B[32m 1137\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[38;5;28;01mNone\u001B[39;00m\n\u001B[32m-> \u001B[39m\u001B[32m1138\u001B[39m line = \u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43m_fp\u001B[49m\u001B[43m.\u001B[49m\u001B[43mfp\u001B[49m\u001B[43m.\u001B[49m\u001B[43mreadline\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m \u001B[38;5;66;03m# type: ignore[union-attr]\u001B[39;00m\n\u001B[32m 1139\u001B[39m line = line.split(\u001B[33mb\u001B[39m\u001B[33m\"\u001B[39m\u001B[33m;\u001B[39m\u001B[33m\"\u001B[39m, \u001B[32m1\u001B[39m)[\u001B[32m0\u001B[39m]\n\u001B[32m 1140\u001B[39m \u001B[38;5;28;01mtry\u001B[39;00m:\n",
"\u001B[36mFile \u001B[39m\u001B[32m~/anaconda3/lib/python3.12/socket.py:720\u001B[39m, in \u001B[36mSocketIO.readinto\u001B[39m\u001B[34m(self, b)\u001B[39m\n\u001B[32m 718\u001B[39m \u001B[38;5;28;01mwhile\u001B[39;00m \u001B[38;5;28;01mTrue\u001B[39;00m:\n\u001B[32m 719\u001B[39m \u001B[38;5;28;01mtry\u001B[39;00m:\n\u001B[32m--> \u001B[39m\u001B[32m720\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43m_sock\u001B[49m\u001B[43m.\u001B[49m\u001B[43mrecv_into\u001B[49m\u001B[43m(\u001B[49m\u001B[43mb\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 721\u001B[39m \u001B[38;5;28;01mexcept\u001B[39;00m timeout:\n\u001B[32m 722\u001B[39m \u001B[38;5;28mself\u001B[39m._timeout_occurred = \u001B[38;5;28;01mTrue\u001B[39;00m\n",
"\u001B[36mFile \u001B[39m\u001B[32m~/anaconda3/lib/python3.12/ssl.py:1251\u001B[39m, in \u001B[36mSSLSocket.recv_into\u001B[39m\u001B[34m(self, buffer, nbytes, flags)\u001B[39m\n\u001B[32m 1247\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m flags != \u001B[32m0\u001B[39m:\n\u001B[32m 1248\u001B[39m \u001B[38;5;28;01mraise\u001B[39;00m \u001B[38;5;167;01mValueError\u001B[39;00m(\n\u001B[32m 1249\u001B[39m \u001B[33m\"\u001B[39m\u001B[33mnon-zero flags not allowed in calls to recv_into() on \u001B[39m\u001B[38;5;132;01m%s\u001B[39;00m\u001B[33m\"\u001B[39m %\n\u001B[32m 1250\u001B[39m \u001B[38;5;28mself\u001B[39m.\u001B[34m__class__\u001B[39m)\n\u001B[32m-> \u001B[39m\u001B[32m1251\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43mread\u001B[49m\u001B[43m(\u001B[49m\u001B[43mnbytes\u001B[49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43mbuffer\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 1252\u001B[39m \u001B[38;5;28;01melse\u001B[39;00m:\n\u001B[32m 1253\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[38;5;28msuper\u001B[39m().recv_into(buffer, nbytes, flags)\n",
"\u001B[36mFile \u001B[39m\u001B[32m~/anaconda3/lib/python3.12/ssl.py:1103\u001B[39m, in \u001B[36mSSLSocket.read\u001B[39m\u001B[34m(self, len, buffer)\u001B[39m\n\u001B[32m 1101\u001B[39m \u001B[38;5;28;01mtry\u001B[39;00m:\n\u001B[32m 1102\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m buffer \u001B[38;5;129;01mis\u001B[39;00m \u001B[38;5;129;01mnot\u001B[39;00m \u001B[38;5;28;01mNone\u001B[39;00m:\n\u001B[32m-> \u001B[39m\u001B[32m1103\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43m_sslobj\u001B[49m\u001B[43m.\u001B[49m\u001B[43mread\u001B[49m\u001B[43m(\u001B[49m\u001B[38;5;28;43mlen\u001B[39;49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43mbuffer\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 1104\u001B[39m \u001B[38;5;28;01melse\u001B[39;00m:\n\u001B[32m 1105\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[38;5;28mself\u001B[39m._sslobj.read(\u001B[38;5;28mlen\u001B[39m)\n",
"\u001B[31mKeyboardInterrupt\u001B[39m: "
]
}
],
"execution_count": 3
}
],
"metadata": {