From 866139baea458696a6a1fbfc5882328005a53f8b Mon Sep 17 00:00:00 2001 From: GotthardG <51994228+GotthardG@users.noreply.github.com> Date: Fri, 11 Apr 2025 13:59:46 +0200 Subject: [PATCH] Added a test function for SSE --- backend/app/routers/processing.py | 16 ++-- testfunctions.ipynb | 121 ++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+), 7 deletions(-) diff --git a/backend/app/routers/processing.py b/backend/app/routers/processing.py index 62607ad..71f4176 100644 --- a/backend/app/routers/processing.py +++ b/backend/app/routers/processing.py @@ -1,25 +1,27 @@ +import json +import asyncio from fastapi import APIRouter, Depends from sqlalchemy.orm import Session from starlette.responses import StreamingResponse -import asyncio -import json - from app.models import JobStatus, Jobs as JobModel from app.dependencies import get_db router = APIRouter() -async def job_event_generator(db): +async def job_event_generator(db: Session): while True: + # Fetch jobs with status TODO jobs = db.query(JobModel).filter(JobModel.status == JobStatus.TODO).all() if jobs: - yield json.dumps([job.parameters for job in jobs]) + "\n\n" - await asyncio.sleep(5) + # It's recommended to explicitly communicate IDs clearly + job_payload = [{"id": job.id, "parameters": job.parameters} for job in jobs] + yield f"data: {json.dumps(job_payload)}\n\n" + + await asyncio.sleep(5) # A reasonable heartbeat/refresh @router.get("/jobs/stream") async def stream_jobs(db: Session = Depends(get_db)): - # test return StreamingResponse(job_event_generator(db), media_type="text/event-stream") diff --git a/testfunctions.ipynb b/testfunctions.ipynb index 8168e44..8649c1e 100644 --- a/testfunctions.ipynb +++ b/testfunctions.ipynb @@ -935,6 +935,127 @@ } ], "execution_count": 6 + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2025-04-11T11:57:22.504015Z", + "start_time": "2025-04-11T11:56:30.723778Z" + } + }, + "cell_type": "code", + "source": [ + "import requests\n", + "import sseclient\n", + "import json\n", + "\n", + "SSE_URL = \"https://127.0.0.1:8000/processing/jobs/stream\" # Replace clearly according to backend\n", + "\n", + "\n", + "def listen_and_process_jobs(url):\n", + " print(\"Starting processing pipeline...\")\n", + " with requests.get(url, stream=True, verify=False) as response:\n", + " if response.status_code != 200:\n", + " print(f\"Failed to connect with status code: {response.status_code}\")\n", + " return\n", + "\n", + " client = sseclient.SSEClient(response)\n", + "\n", + " for event in client.events():\n", + " try:\n", + " jobs = json.loads(event.data)\n", + " print(f\"Jobs received: {jobs}\")\n", + "\n", + " for job in jobs:\n", + " job_id = job[\"id\"]\n", + " parameters = job[\"parameters\"]\n", + " print(f\"Processing job ID: {job_id} with parameters: {parameters}\")\n", + "\n", + " # TODO: your custom job-processing logic clearly goes here\n", + " process_job_logic(job_id, parameters)\n", + "\n", + " # After job processing completes, send results & update job status\n", + " submit_job_result(job_id, processing_result={'success': True})\n", + " except json.JSONDecodeError as e:\n", + " print(f\"Error decoding event data: {e}\")\n", + "\n", + "\n", + "def process_job_logic(job_id, parameters):\n", + " # Here: insert your application-specific processing logic\n", + " print(f\"📦 {job_id} - Processing detailed logic clearly here: {parameters}\")\n", + " # simulate execution:\n", + " import time\n", + " time.sleep(5) # Simulating task clearly\n", + " print(f\"✅ Job {job_id} processing complete\")\n", + "\n", + "\n", + "def submit_job_result(job_id, processing_result):\n", + " # Using autogenerated client or direct HTTP calls clearly:\n", + " import backend.aareDBclient as aareDBclient\n", + " from aareDBclient.models import ResultCreate\n", + "\n", + " configuration = aareDBclient.Configuration(\n", + " host=\"https://127.0.0.1:8000\",\n", + " verify_ssl=False\n", + " )\n", + "\n", + " with aareDBclient.ApiClient(configuration) as api_client:\n", + " api_instance = aareDBclient.ProcessingApi(api_client) # assuming this exists!\n", + "\n", + " result = ResultCreate(\n", + " job_id=job_id, # Assuming you have such a field clearly\n", + " result=processing_result\n", + " )\n", + " try:\n", + " res = api_instance.submit_processing_result(result_create=result)\n", + " print(f\"Job {job_id} result submitted successfully! Backend Response: {res}\")\n", + " except aareDBclient.rest.ApiException as e:\n", + " print(f\"Failed to submit result for Job {job_id}: {e}\")\n", + "\n", + "\n", + "if __name__ == \"__main__\":\n", + " listen_and_process_jobs(SSE_URL)\n" + ], + "id": "aef43f1265a23cb1", + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Starting processing pipeline...\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/gotthardg/PycharmProjects/aaredb/.venv/lib/python3.12/site-packages/urllib3/connectionpool.py:1097: InsecureRequestWarning: Unverified HTTPS request is being made to host '127.0.0.1'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#tls-warnings\n", + " warnings.warn(\n" + ] + }, + { + "ename": "KeyboardInterrupt", + "evalue": "", + "output_type": "error", + "traceback": [ + "\u001B[31m---------------------------------------------------------------------------\u001B[39m", + "\u001B[31mKeyboardInterrupt\u001B[39m Traceback (most recent call last)", + "\u001B[36mCell\u001B[39m\u001B[36m \u001B[39m\u001B[32mIn[3]\u001B[39m\u001B[32m, line 70\u001B[39m\n\u001B[32m 66\u001B[39m \u001B[38;5;28mprint\u001B[39m(\u001B[33mf\u001B[39m\u001B[33m\"\u001B[39m\u001B[33mFailed to submit result for Job \u001B[39m\u001B[38;5;132;01m{\u001B[39;00mjob_id\u001B[38;5;132;01m}\u001B[39;00m\u001B[33m: \u001B[39m\u001B[38;5;132;01m{\u001B[39;00me\u001B[38;5;132;01m}\u001B[39;00m\u001B[33m\"\u001B[39m)\n\u001B[32m 69\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[34m__name__\u001B[39m == \u001B[33m\"\u001B[39m\u001B[33m__main__\u001B[39m\u001B[33m\"\u001B[39m:\n\u001B[32m---> \u001B[39m\u001B[32m70\u001B[39m \u001B[43mlisten_and_process_jobs\u001B[49m\u001B[43m(\u001B[49m\u001B[43mSSE_URL\u001B[49m\u001B[43m)\u001B[49m\n", + "\u001B[36mCell\u001B[39m\u001B[36m \u001B[39m\u001B[32mIn[3]\u001B[39m\u001B[32m, line 17\u001B[39m, in \u001B[36mlisten_and_process_jobs\u001B[39m\u001B[34m(url)\u001B[39m\n\u001B[32m 13\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m\n\u001B[32m 15\u001B[39m client = sseclient.SSEClient(response)\n\u001B[32m---> \u001B[39m\u001B[32m17\u001B[39m \u001B[43m\u001B[49m\u001B[38;5;28;43;01mfor\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mevent\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;129;43;01min\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mclient\u001B[49m\u001B[43m.\u001B[49m\u001B[43mevents\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m\u001B[43m:\u001B[49m\n\u001B[32m 18\u001B[39m \u001B[43m \u001B[49m\u001B[38;5;28;43;01mtry\u001B[39;49;00m\u001B[43m:\u001B[49m\n\u001B[32m 19\u001B[39m \u001B[43m \u001B[49m\u001B[43mjobs\u001B[49m\u001B[43m \u001B[49m\u001B[43m=\u001B[49m\u001B[43m \u001B[49m\u001B[43mjson\u001B[49m\u001B[43m.\u001B[49m\u001B[43mloads\u001B[49m\u001B[43m(\u001B[49m\u001B[43mevent\u001B[49m\u001B[43m.\u001B[49m\u001B[43mdata\u001B[49m\u001B[43m)\u001B[49m\n", + "\u001B[36mFile \u001B[39m\u001B[32m~/PycharmProjects/aaredb/.venv/lib/python3.12/site-packages/sseclient/__init__.py:55\u001B[39m, in \u001B[36mSSEClient.events\u001B[39m\u001B[34m(self)\u001B[39m\n\u001B[32m 54\u001B[39m \u001B[38;5;28;01mdef\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34mevents\u001B[39m(\u001B[38;5;28mself\u001B[39m):\n\u001B[32m---> \u001B[39m\u001B[32m55\u001B[39m \u001B[43m \u001B[49m\u001B[38;5;28;43;01mfor\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mchunk\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;129;43;01min\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43m_read\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m\u001B[43m:\u001B[49m\n\u001B[32m 56\u001B[39m \u001B[43m \u001B[49m\u001B[43mevent\u001B[49m\u001B[43m \u001B[49m\u001B[43m=\u001B[49m\u001B[43m \u001B[49m\u001B[43mEvent\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 57\u001B[39m \u001B[43m \u001B[49m\u001B[38;5;66;43;03m# Split before decoding so splitlines() only uses \\r and \\n\u001B[39;49;00m\n", + "\u001B[36mFile \u001B[39m\u001B[32m~/PycharmProjects/aaredb/.venv/lib/python3.12/site-packages/sseclient/__init__.py:45\u001B[39m, in \u001B[36mSSEClient._read\u001B[39m\u001B[34m(self)\u001B[39m\n\u001B[32m 38\u001B[39m \u001B[38;5;250m\u001B[39m\u001B[33;03m\"\"\"Read the incoming event source stream and yield event chunks.\u001B[39;00m\n\u001B[32m 39\u001B[39m \n\u001B[32m 40\u001B[39m \u001B[33;03mUnfortunately it is possible for some servers to decide to break an\u001B[39;00m\n\u001B[32m 41\u001B[39m \u001B[33;03mevent into multiple HTTP chunks in the response. It is thus necessary\u001B[39;00m\n\u001B[32m 42\u001B[39m \u001B[33;03mto correctly stitch together consecutive response chunks and find the\u001B[39;00m\n\u001B[32m 43\u001B[39m \u001B[33;03mSSE delimiter (empty new line) to yield full, correct event chunks.\"\"\"\u001B[39;00m\n\u001B[32m 44\u001B[39m data = \u001B[33mb\u001B[39m\u001B[33m'\u001B[39m\u001B[33m'\u001B[39m\n\u001B[32m---> \u001B[39m\u001B[32m45\u001B[39m \u001B[43m\u001B[49m\u001B[38;5;28;43;01mfor\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mchunk\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;129;43;01min\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43m_event_source\u001B[49m\u001B[43m:\u001B[49m\n\u001B[32m 46\u001B[39m \u001B[43m \u001B[49m\u001B[38;5;28;43;01mfor\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mline\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;129;43;01min\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mchunk\u001B[49m\u001B[43m.\u001B[49m\u001B[43msplitlines\u001B[49m\u001B[43m(\u001B[49m\u001B[38;5;28;43;01mTrue\u001B[39;49;00m\u001B[43m)\u001B[49m\u001B[43m:\u001B[49m\n\u001B[32m 47\u001B[39m \u001B[43m \u001B[49m\u001B[43mdata\u001B[49m\u001B[43m \u001B[49m\u001B[43m+\u001B[49m\u001B[43m=\u001B[49m\u001B[43m \u001B[49m\u001B[43mline\u001B[49m\n", + "\u001B[36mFile \u001B[39m\u001B[32m~/PycharmProjects/aaredb/.venv/lib/python3.12/site-packages/requests/models.py:820\u001B[39m, in \u001B[36mResponse.iter_content..generate\u001B[39m\u001B[34m()\u001B[39m\n\u001B[32m 818\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;28mhasattr\u001B[39m(\u001B[38;5;28mself\u001B[39m.raw, \u001B[33m\"\u001B[39m\u001B[33mstream\u001B[39m\u001B[33m\"\u001B[39m):\n\u001B[32m 819\u001B[39m \u001B[38;5;28;01mtry\u001B[39;00m:\n\u001B[32m--> \u001B[39m\u001B[32m820\u001B[39m \u001B[38;5;28;01myield from\u001B[39;00m \u001B[38;5;28mself\u001B[39m.raw.stream(chunk_size, decode_content=\u001B[38;5;28;01mTrue\u001B[39;00m)\n\u001B[32m 821\u001B[39m \u001B[38;5;28;01mexcept\u001B[39;00m ProtocolError \u001B[38;5;28;01mas\u001B[39;00m e:\n\u001B[32m 822\u001B[39m \u001B[38;5;28;01mraise\u001B[39;00m ChunkedEncodingError(e)\n", + "\u001B[36mFile \u001B[39m\u001B[32m~/PycharmProjects/aaredb/.venv/lib/python3.12/site-packages/urllib3/response.py:1063\u001B[39m, in \u001B[36mHTTPResponse.stream\u001B[39m\u001B[34m(self, amt, decode_content)\u001B[39m\n\u001B[32m 1047\u001B[39m \u001B[38;5;250m\u001B[39m\u001B[33;03m\"\"\"\u001B[39;00m\n\u001B[32m 1048\u001B[39m \u001B[33;03mA generator wrapper for the read() method. A call will block until\u001B[39;00m\n\u001B[32m 1049\u001B[39m \u001B[33;03m``amt`` bytes have been read from the connection or until the\u001B[39;00m\n\u001B[32m (...)\u001B[39m\u001B[32m 1060\u001B[39m \u001B[33;03m 'content-encoding' header.\u001B[39;00m\n\u001B[32m 1061\u001B[39m \u001B[33;03m\"\"\"\u001B[39;00m\n\u001B[32m 1062\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;28mself\u001B[39m.chunked \u001B[38;5;129;01mand\u001B[39;00m \u001B[38;5;28mself\u001B[39m.supports_chunked_reads():\n\u001B[32m-> \u001B[39m\u001B[32m1063\u001B[39m \u001B[38;5;28;01myield from\u001B[39;00m \u001B[38;5;28mself\u001B[39m.read_chunked(amt, decode_content=decode_content)\n\u001B[32m 1064\u001B[39m \u001B[38;5;28;01melse\u001B[39;00m:\n\u001B[32m 1065\u001B[39m \u001B[38;5;28;01mwhile\u001B[39;00m \u001B[38;5;129;01mnot\u001B[39;00m is_fp_closed(\u001B[38;5;28mself\u001B[39m._fp) \u001B[38;5;129;01mor\u001B[39;00m \u001B[38;5;28mlen\u001B[39m(\u001B[38;5;28mself\u001B[39m._decoded_buffer) > \u001B[32m0\u001B[39m:\n", + "\u001B[36mFile \u001B[39m\u001B[32m~/PycharmProjects/aaredb/.venv/lib/python3.12/site-packages/urllib3/response.py:1219\u001B[39m, in \u001B[36mHTTPResponse.read_chunked\u001B[39m\u001B[34m(self, amt, decode_content)\u001B[39m\n\u001B[32m 1216\u001B[39m amt = \u001B[38;5;28;01mNone\u001B[39;00m\n\u001B[32m 1218\u001B[39m \u001B[38;5;28;01mwhile\u001B[39;00m \u001B[38;5;28;01mTrue\u001B[39;00m:\n\u001B[32m-> \u001B[39m\u001B[32m1219\u001B[39m \u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43m_update_chunk_length\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 1220\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;28mself\u001B[39m.chunk_left == \u001B[32m0\u001B[39m:\n\u001B[32m 1221\u001B[39m \u001B[38;5;28;01mbreak\u001B[39;00m\n", + "\u001B[36mFile \u001B[39m\u001B[32m~/PycharmProjects/aaredb/.venv/lib/python3.12/site-packages/urllib3/response.py:1138\u001B[39m, in \u001B[36mHTTPResponse._update_chunk_length\u001B[39m\u001B[34m(self)\u001B[39m\n\u001B[32m 1136\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;28mself\u001B[39m.chunk_left \u001B[38;5;129;01mis\u001B[39;00m \u001B[38;5;129;01mnot\u001B[39;00m \u001B[38;5;28;01mNone\u001B[39;00m:\n\u001B[32m 1137\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[38;5;28;01mNone\u001B[39;00m\n\u001B[32m-> \u001B[39m\u001B[32m1138\u001B[39m line = \u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43m_fp\u001B[49m\u001B[43m.\u001B[49m\u001B[43mfp\u001B[49m\u001B[43m.\u001B[49m\u001B[43mreadline\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m \u001B[38;5;66;03m# type: ignore[union-attr]\u001B[39;00m\n\u001B[32m 1139\u001B[39m line = line.split(\u001B[33mb\u001B[39m\u001B[33m\"\u001B[39m\u001B[33m;\u001B[39m\u001B[33m\"\u001B[39m, \u001B[32m1\u001B[39m)[\u001B[32m0\u001B[39m]\n\u001B[32m 1140\u001B[39m \u001B[38;5;28;01mtry\u001B[39;00m:\n", + "\u001B[36mFile \u001B[39m\u001B[32m~/anaconda3/lib/python3.12/socket.py:720\u001B[39m, in \u001B[36mSocketIO.readinto\u001B[39m\u001B[34m(self, b)\u001B[39m\n\u001B[32m 718\u001B[39m \u001B[38;5;28;01mwhile\u001B[39;00m \u001B[38;5;28;01mTrue\u001B[39;00m:\n\u001B[32m 719\u001B[39m \u001B[38;5;28;01mtry\u001B[39;00m:\n\u001B[32m--> \u001B[39m\u001B[32m720\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43m_sock\u001B[49m\u001B[43m.\u001B[49m\u001B[43mrecv_into\u001B[49m\u001B[43m(\u001B[49m\u001B[43mb\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 721\u001B[39m \u001B[38;5;28;01mexcept\u001B[39;00m timeout:\n\u001B[32m 722\u001B[39m \u001B[38;5;28mself\u001B[39m._timeout_occurred = \u001B[38;5;28;01mTrue\u001B[39;00m\n", + "\u001B[36mFile \u001B[39m\u001B[32m~/anaconda3/lib/python3.12/ssl.py:1251\u001B[39m, in \u001B[36mSSLSocket.recv_into\u001B[39m\u001B[34m(self, buffer, nbytes, flags)\u001B[39m\n\u001B[32m 1247\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m flags != \u001B[32m0\u001B[39m:\n\u001B[32m 1248\u001B[39m \u001B[38;5;28;01mraise\u001B[39;00m \u001B[38;5;167;01mValueError\u001B[39;00m(\n\u001B[32m 1249\u001B[39m \u001B[33m\"\u001B[39m\u001B[33mnon-zero flags not allowed in calls to recv_into() on \u001B[39m\u001B[38;5;132;01m%s\u001B[39;00m\u001B[33m\"\u001B[39m %\n\u001B[32m 1250\u001B[39m \u001B[38;5;28mself\u001B[39m.\u001B[34m__class__\u001B[39m)\n\u001B[32m-> \u001B[39m\u001B[32m1251\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43mread\u001B[49m\u001B[43m(\u001B[49m\u001B[43mnbytes\u001B[49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43mbuffer\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 1252\u001B[39m \u001B[38;5;28;01melse\u001B[39;00m:\n\u001B[32m 1253\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[38;5;28msuper\u001B[39m().recv_into(buffer, nbytes, flags)\n", + "\u001B[36mFile \u001B[39m\u001B[32m~/anaconda3/lib/python3.12/ssl.py:1103\u001B[39m, in \u001B[36mSSLSocket.read\u001B[39m\u001B[34m(self, len, buffer)\u001B[39m\n\u001B[32m 1101\u001B[39m \u001B[38;5;28;01mtry\u001B[39;00m:\n\u001B[32m 1102\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m buffer \u001B[38;5;129;01mis\u001B[39;00m \u001B[38;5;129;01mnot\u001B[39;00m \u001B[38;5;28;01mNone\u001B[39;00m:\n\u001B[32m-> \u001B[39m\u001B[32m1103\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43m_sslobj\u001B[49m\u001B[43m.\u001B[49m\u001B[43mread\u001B[49m\u001B[43m(\u001B[49m\u001B[38;5;28;43mlen\u001B[39;49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43mbuffer\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 1104\u001B[39m \u001B[38;5;28;01melse\u001B[39;00m:\n\u001B[32m 1105\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[38;5;28mself\u001B[39m._sslobj.read(\u001B[38;5;28mlen\u001B[39m)\n", + "\u001B[31mKeyboardInterrupt\u001B[39m: " + ] + } + ], + "execution_count": 3 } ], "metadata": {