{ "cells": [ { "cell_type": "code", "id": "initial_id", "metadata": { "collapsed": true, "ExecuteTime": { "end_time": "2025-04-30T08:33:36.239042Z", "start_time": "2025-04-29T21:16:51.426746Z" } }, "source": [ "import requests\n", "import sseclient\n", "import json\n", "\n", "SSE_URL = \"https://127.0.0.1:8000/processing/jobs/stream\"\n", "UPDATE_URL = \"https://127.0.0.1:8000/processing/jobs/update_status\"\n", "\n", "def submit_job_update(job_id, status, slurm_id):\n", " payload = {\n", " \"job_id\": job_id,\n", " \"status\": status,\n", " \"slurm_id\": slurm_id,\n", " }\n", " try:\n", " response = requests.post(UPDATE_URL, json=payload, verify=False)\n", " if response.status_code == 200:\n", " print(f\"✅ Job {job_id} status updated to '{status}'. Response: {response.json()}\")\n", " else:\n", " print(f\"❌ Failed to update job {job_id}. Status: {response.status_code}. Response: {response.text}\")\n", " except Exception as e:\n", " print(f\"Failed to submit update for Job {job_id}: {e}\")\n", "\n", "def listen_and_update_jobs(url):\n", " print(\"Starting job status updater...\")\n", " with requests.get(url, stream=True, verify=False) as response:\n", " if response.status_code != 200:\n", " print(f\"Failed to connect with status code: {response.status_code}\")\n", " return\n", "\n", " client = sseclient.SSEClient(response)\n", "\n", " for event in client.events():\n", " try:\n", " jobs = json.loads(event.data)\n", " print(f\"Jobs received: {jobs}\")\n", "\n", " for job in jobs:\n", " job_id = job.get(\"job_id\")\n", " print(f\"Job ID: {job_id}, Current status: {job.get('status')}\")\n", " # Immediately update status to \"submitted\"\n", " submit_job_update(job_id, \"submitted\", 76545678)\n", " except json.JSONDecodeError as e:\n", " print(f\"Error decoding event data: {e}\")\n", " except Exception as e:\n", " print(f\"Unexpected error while processing event: {e}\")\n", "\n", "if __name__ == \"__main__\":\n", " listen_and_update_jobs(SSE_URL)\n" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting job status updater...\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/urllib3/connectionpool.py:1103: InsecureRequestWarning: Unverified HTTPS request is being made to host '127.0.0.1'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#tls-warnings\n", " warnings.warn(\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Jobs received: [{'job_id': 1, 'sample_id': 204, 'run_id': 1, 'sample_name': 'Sample204', 'status': 'todo', 'type': 'default_type', 'created_at': '2025-04-29T21:17:02.847967', 'updated_at': None, 'data_collection_parameters': None, 'experiment_parameters': {'synchrotron': 'Swiss Light Source', 'beamline': 'PXIII', 'detector': {'manufacturer': 'DECTRIS', 'model': 'PILATUS4 2M', 'type': 'photon-counting', 'serialNumber': '16684dscsd668468', 'detectorDistance_mm': 95.0, 'beamCenterX_px': 512.0, 'beamCenterY_px': 512.0, 'pixelSizeX_um': 150.0, 'pixelSizeY_um': 150.0}, 'wavelength': 1.0, 'ringCurrent_A': 0.0, 'ringMode': 'Machine Down', 'undulator': None, 'undulatorgap_mm': None, 'monochromator': 'Si111', 'transmission': 1.0, 'focusingOptic': 'Kirkpatrick-Baez', 'beamlineFluxAtSample_ph_s': 0.0, 'beamSizeWidth': 30.0, 'beamSizeHeight': 30.0, 'characterization': None, 'rotation': {'omegaStart_deg': 0.0, 'omegaStep': 0.1, 'chi': 0.0, 'phi': 10.0, 'numberOfImages': 3600, 'exposureTime_s': 0.02}, 'gridScan': None, 'jet': None, 'cryojetTemperature_K': None, 'humidifierTemperature_K': None, 'humidifierHumidity': None}, 'slurm_id': None}]\n", "Job ID: 1, Current status: todo\n", "✅ Job 1 status updated to 'submitted'. Response: {'job_id': 1, 'status': 'submitted', 'slurm_id': 76545678}\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/urllib3/connectionpool.py:1103: InsecureRequestWarning: Unverified HTTPS request is being made to host '127.0.0.1'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#tls-warnings\n", " warnings.warn(\n" ] }, { "ename": "ChunkedEncodingError", "evalue": "Response ended prematurely", "output_type": "error", "traceback": [ "\u001B[0;31m---------------------------------------------------------------------------\u001B[0m", "\u001B[0;31mProtocolError\u001B[0m Traceback (most recent call last)", "File \u001B[0;32m/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/requests/models.py:820\u001B[0m, in \u001B[0;36mResponse.iter_content..generate\u001B[0;34m()\u001B[0m\n\u001B[1;32m 819\u001B[0m \u001B[38;5;28;01mtry\u001B[39;00m:\n\u001B[0;32m--> 820\u001B[0m \u001B[38;5;28;01myield from\u001B[39;00m \u001B[38;5;28mself\u001B[39m\u001B[38;5;241m.\u001B[39mraw\u001B[38;5;241m.\u001B[39mstream(chunk_size, decode_content\u001B[38;5;241m=\u001B[39m\u001B[38;5;28;01mTrue\u001B[39;00m)\n\u001B[1;32m 821\u001B[0m \u001B[38;5;28;01mexcept\u001B[39;00m ProtocolError \u001B[38;5;28;01mas\u001B[39;00m e:\n", "File \u001B[0;32m/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/urllib3/response.py:1040\u001B[0m, in \u001B[0;36mHTTPResponse.stream\u001B[0;34m(self, amt, decode_content)\u001B[0m\n\u001B[1;32m 1039\u001B[0m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;28mself\u001B[39m\u001B[38;5;241m.\u001B[39mchunked \u001B[38;5;129;01mand\u001B[39;00m \u001B[38;5;28mself\u001B[39m\u001B[38;5;241m.\u001B[39msupports_chunked_reads():\n\u001B[0;32m-> 1040\u001B[0m \u001B[38;5;28;01myield from\u001B[39;00m \u001B[38;5;28mself\u001B[39m\u001B[38;5;241m.\u001B[39mread_chunked(amt, decode_content\u001B[38;5;241m=\u001B[39mdecode_content)\n\u001B[1;32m 1041\u001B[0m \u001B[38;5;28;01melse\u001B[39;00m:\n", "File \u001B[0;32m/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/urllib3/response.py:1184\u001B[0m, in \u001B[0;36mHTTPResponse.read_chunked\u001B[0;34m(self, amt, decode_content)\u001B[0m\n\u001B[1;32m 1183\u001B[0m \u001B[38;5;28;01mwhile\u001B[39;00m \u001B[38;5;28;01mTrue\u001B[39;00m:\n\u001B[0;32m-> 1184\u001B[0m \u001B[38;5;28;43mself\u001B[39;49m\u001B[38;5;241;43m.\u001B[39;49m\u001B[43m_update_chunk_length\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m\n\u001B[1;32m 1185\u001B[0m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;28mself\u001B[39m\u001B[38;5;241m.\u001B[39mchunk_left \u001B[38;5;241m==\u001B[39m \u001B[38;5;241m0\u001B[39m:\n", "File \u001B[0;32m/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/urllib3/response.py:1119\u001B[0m, in \u001B[0;36mHTTPResponse._update_chunk_length\u001B[0;34m(self)\u001B[0m\n\u001B[1;32m 1117\u001B[0m \u001B[38;5;28;01melse\u001B[39;00m:\n\u001B[1;32m 1118\u001B[0m \u001B[38;5;66;03m# Truncated at start of next chunk\u001B[39;00m\n\u001B[0;32m-> 1119\u001B[0m \u001B[38;5;28;01mraise\u001B[39;00m ProtocolError(\u001B[38;5;124m\"\u001B[39m\u001B[38;5;124mResponse ended prematurely\u001B[39m\u001B[38;5;124m\"\u001B[39m) \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mNone\u001B[39;00m\n", "\u001B[0;31mProtocolError\u001B[0m: Response ended prematurely", "\nDuring handling of the above exception, another exception occurred:\n", "\u001B[0;31mChunkedEncodingError\u001B[0m Traceback (most recent call last)", "Cell \u001B[0;32mIn[12], line 48\u001B[0m\n\u001B[1;32m 45\u001B[0m \u001B[38;5;28mprint\u001B[39m(\u001B[38;5;124mf\u001B[39m\u001B[38;5;124m\"\u001B[39m\u001B[38;5;124mUnexpected error while processing event: \u001B[39m\u001B[38;5;132;01m{\u001B[39;00me\u001B[38;5;132;01m}\u001B[39;00m\u001B[38;5;124m\"\u001B[39m)\n\u001B[1;32m 47\u001B[0m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;18m__name__\u001B[39m \u001B[38;5;241m==\u001B[39m \u001B[38;5;124m\"\u001B[39m\u001B[38;5;124m__main__\u001B[39m\u001B[38;5;124m\"\u001B[39m:\n\u001B[0;32m---> 48\u001B[0m \u001B[43mlisten_and_update_jobs\u001B[49m\u001B[43m(\u001B[49m\u001B[43mSSE_URL\u001B[49m\u001B[43m)\u001B[49m\n", "Cell \u001B[0;32mIn[12], line 32\u001B[0m, in \u001B[0;36mlisten_and_update_jobs\u001B[0;34m(url)\u001B[0m\n\u001B[1;32m 28\u001B[0m \u001B[38;5;28;01mreturn\u001B[39;00m\n\u001B[1;32m 30\u001B[0m client \u001B[38;5;241m=\u001B[39m sseclient\u001B[38;5;241m.\u001B[39mSSEClient(response)\n\u001B[0;32m---> 32\u001B[0m \u001B[43m\u001B[49m\u001B[38;5;28;43;01mfor\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mevent\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;129;43;01min\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mclient\u001B[49m\u001B[38;5;241;43m.\u001B[39;49m\u001B[43mevents\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m\u001B[43m:\u001B[49m\n\u001B[1;32m 33\u001B[0m \u001B[43m \u001B[49m\u001B[38;5;28;43;01mtry\u001B[39;49;00m\u001B[43m:\u001B[49m\n\u001B[1;32m 34\u001B[0m \u001B[43m \u001B[49m\u001B[43mjobs\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[43m \u001B[49m\u001B[43mjson\u001B[49m\u001B[38;5;241;43m.\u001B[39;49m\u001B[43mloads\u001B[49m\u001B[43m(\u001B[49m\u001B[43mevent\u001B[49m\u001B[38;5;241;43m.\u001B[39;49m\u001B[43mdata\u001B[49m\u001B[43m)\u001B[49m\n", "File \u001B[0;32m/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/sseclient/__init__.py:55\u001B[0m, in \u001B[0;36mSSEClient.events\u001B[0;34m(self)\u001B[0m\n\u001B[1;32m 54\u001B[0m \u001B[38;5;28;01mdef\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;21mevents\u001B[39m(\u001B[38;5;28mself\u001B[39m):\n\u001B[0;32m---> 55\u001B[0m \u001B[43m \u001B[49m\u001B[38;5;28;43;01mfor\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mchunk\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;129;43;01min\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[38;5;28;43mself\u001B[39;49m\u001B[38;5;241;43m.\u001B[39;49m\u001B[43m_read\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m\u001B[43m:\u001B[49m\n\u001B[1;32m 56\u001B[0m \u001B[43m \u001B[49m\u001B[43mevent\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[43m \u001B[49m\u001B[43mEvent\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m\n\u001B[1;32m 57\u001B[0m \u001B[43m \u001B[49m\u001B[38;5;66;43;03m# Split before decoding so splitlines() only uses \\r and \\n\u001B[39;49;00m\n", "File \u001B[0;32m/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/sseclient/__init__.py:45\u001B[0m, in \u001B[0;36mSSEClient._read\u001B[0;34m(self)\u001B[0m\n\u001B[1;32m 38\u001B[0m \u001B[38;5;250m\u001B[39m\u001B[38;5;124;03m\"\"\"Read the incoming event source stream and yield event chunks.\u001B[39;00m\n\u001B[1;32m 39\u001B[0m \n\u001B[1;32m 40\u001B[0m \u001B[38;5;124;03mUnfortunately it is possible for some servers to decide to break an\u001B[39;00m\n\u001B[1;32m 41\u001B[0m \u001B[38;5;124;03mevent into multiple HTTP chunks in the response. It is thus necessary\u001B[39;00m\n\u001B[1;32m 42\u001B[0m \u001B[38;5;124;03mto correctly stitch together consecutive response chunks and find the\u001B[39;00m\n\u001B[1;32m 43\u001B[0m \u001B[38;5;124;03mSSE delimiter (empty new line) to yield full, correct event chunks.\"\"\"\u001B[39;00m\n\u001B[1;32m 44\u001B[0m data \u001B[38;5;241m=\u001B[39m \u001B[38;5;124mb\u001B[39m\u001B[38;5;124m'\u001B[39m\u001B[38;5;124m'\u001B[39m\n\u001B[0;32m---> 45\u001B[0m \u001B[43m\u001B[49m\u001B[38;5;28;43;01mfor\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mchunk\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;129;43;01min\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[38;5;28;43mself\u001B[39;49m\u001B[38;5;241;43m.\u001B[39;49m\u001B[43m_event_source\u001B[49m\u001B[43m:\u001B[49m\n\u001B[1;32m 46\u001B[0m \u001B[43m \u001B[49m\u001B[38;5;28;43;01mfor\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mline\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;129;43;01min\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mchunk\u001B[49m\u001B[38;5;241;43m.\u001B[39;49m\u001B[43msplitlines\u001B[49m\u001B[43m(\u001B[49m\u001B[38;5;28;43;01mTrue\u001B[39;49;00m\u001B[43m)\u001B[49m\u001B[43m:\u001B[49m\n\u001B[1;32m 47\u001B[0m \u001B[43m \u001B[49m\u001B[43mdata\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;241;43m+\u001B[39;49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[43m \u001B[49m\u001B[43mline\u001B[49m\n", "File \u001B[0;32m/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/requests/models.py:822\u001B[0m, in \u001B[0;36mResponse.iter_content..generate\u001B[0;34m()\u001B[0m\n\u001B[1;32m 820\u001B[0m \u001B[38;5;28;01myield from\u001B[39;00m \u001B[38;5;28mself\u001B[39m\u001B[38;5;241m.\u001B[39mraw\u001B[38;5;241m.\u001B[39mstream(chunk_size, decode_content\u001B[38;5;241m=\u001B[39m\u001B[38;5;28;01mTrue\u001B[39;00m)\n\u001B[1;32m 821\u001B[0m \u001B[38;5;28;01mexcept\u001B[39;00m ProtocolError \u001B[38;5;28;01mas\u001B[39;00m e:\n\u001B[0;32m--> 822\u001B[0m \u001B[38;5;28;01mraise\u001B[39;00m ChunkedEncodingError(e)\n\u001B[1;32m 823\u001B[0m \u001B[38;5;28;01mexcept\u001B[39;00m DecodeError \u001B[38;5;28;01mas\u001B[39;00m e:\n\u001B[1;32m 824\u001B[0m \u001B[38;5;28;01mraise\u001B[39;00m ContentDecodingError(e)\n", "\u001B[0;31mChunkedEncodingError\u001B[0m: Response ended prematurely" ] } ], "execution_count": 12 } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.6" } }, "nbformat": 4, "nbformat_minor": 5 }