Add job processing system with streaming endpoint

Introduced a `processing` router to handle job streaming using server-sent events. Added `Jobs` and `JobStatus` models for managing job-related data, along with database creation logic. Updated the `sample` router to create new job entries during experiment creation.
This commit is contained in:
GotthardG 2025-04-10 11:53:36 +02:00
parent f54ffd138a
commit fda9142155
5 changed files with 63 additions and 2 deletions

View File

@ -7,10 +7,13 @@ from sqlalchemy import (
JSON, JSON,
DateTime, DateTime,
Boolean, Boolean,
Enum,
func,
) )
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from .database import Base from .database import Base
from datetime import datetime from datetime import datetime
import enum
class Shipment(Base): class Shipment(Base):
@ -303,3 +306,20 @@ class Results(Base):
# total_refl: int # total_refl: int
# unique_refl: int # unique_refl: int
# #comments: Optional[constr(max_length=200)] = None # #comments: Optional[constr(max_length=200)] = None
class JobStatus(str, enum.Enum):
TODO = "todo"
SUBMITTED = "submitted"
DONE = "done"
class Jobs(Base):
__tablename__ = "jobs"
id = Column(Integer, primary_key=True, index=True)
experiment_parameters_id = Column(Integer, nullable=False)
status = Column(Enum(JobStatus), default=JobStatus.TODO, nullable=False)
parameters = Column(JSON, nullable=False)
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, onupdate=func.now())

View File

@ -5,6 +5,7 @@ from .proposal import router as proposal_router
from .dewar import dewar_router from .dewar import dewar_router
from .shipment import shipment_router from .shipment import shipment_router
from .auth import router as auth_router from .auth import router as auth_router
from .processing import router as processing_router
from .protected_router import protected_router as protected_router from .protected_router import protected_router as protected_router
__all__ = [ __all__ = [
@ -15,5 +16,6 @@ __all__ = [
"dewar_router", "dewar_router",
"shipment_router", "shipment_router",
"auth_router", "auth_router",
"processing_router",
"protected_router", "protected_router",
] ]

View File

@ -0,0 +1,24 @@
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from starlette.responses import StreamingResponse
import asyncio
import json
from app.models import JobStatus, Jobs as JobModel
from app.dependencies import get_db
router = APIRouter()
async def job_event_generator(db):
while True:
jobs = db.query(JobModel).filter(JobModel.status == JobStatus.TODO).all()
if jobs:
yield json.dumps([job.parameters for job in jobs]) + "\n\n"
await asyncio.sleep(5)
@router.get("/jobs/stream")
async def stream_jobs(db: Session = Depends(get_db)):
return StreamingResponse(job_event_generator(db), media_type="text/event-stream")

View File

@ -28,6 +28,8 @@ from app.models import (
ExperimentParameters as ExperimentParametersModel, ExperimentParameters as ExperimentParametersModel,
# ExperimentParameters, # ExperimentParameters,
Results as ResultsModel, Results as ResultsModel,
Jobs as JobModel,
JobStatus,
) )
from app.dependencies import get_db from app.dependencies import get_db
import logging import logging
@ -348,6 +350,15 @@ def create_experiment_parameters_for_sample(
db.add(new_event) db.add(new_event)
db.commit() db.commit()
new_job = JobModel(
experiment_parameters_id=new_exp.id, # <-- Correct reference here
parameters=new_exp.to_dict(), # assuming params has a to_dict() method
status=JobStatus.TODO,
)
db.add(new_job)
db.commit()
db.refresh(new_job)
return new_exp return new_exp

View File

@ -15,6 +15,7 @@ from app.routers import (
logistics, logistics,
auth, auth,
sample, sample,
processing,
) )
from app.database import Base, engine, SessionLocal from app.database import Base, engine, SessionLocal
from app.routers.protected_router import protected_router from app.routers.protected_router import protected_router
@ -119,6 +120,8 @@ async def lifespan(app: FastAPI):
db = SessionLocal() db = SessionLocal()
try: try:
if environment == "prod": if environment == "prod":
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
from sqlalchemy.engine import reflection from sqlalchemy.engine import reflection
inspector = reflection.Inspector.from_engine(engine) inspector = reflection.Inspector.from_engine(engine)
@ -143,8 +146,8 @@ async def lifespan(app: FastAPI):
load_slots_data(db) load_slots_data(db)
else: # dev or test environments else: # dev or test environments
print(f"{environment.capitalize()} environment: Regenerating database.") print(f"{environment.capitalize()} environment: Regenerating database.")
# Base.metadata.drop_all(bind=engine) Base.metadata.drop_all(bind=engine)
# Base.metadata.create_all(bind=engine) Base.metadata.create_all(bind=engine)
# from sqlalchemy.engine import reflection # from sqlalchemy.engine import reflection
# from app.models import ExperimentParameters # adjust the import as needed # from app.models import ExperimentParameters # adjust the import as needed
# inspector = reflection.Inspector.from_engine(engine) # inspector = reflection.Inspector.from_engine(engine)
@ -194,6 +197,7 @@ app.include_router(puck.router, prefix="/pucks", tags=["pucks"])
app.include_router(spreadsheet.router, tags=["spreadsheet"]) app.include_router(spreadsheet.router, tags=["spreadsheet"])
app.include_router(logistics.router, prefix="/logistics", tags=["logistics"]) app.include_router(logistics.router, prefix="/logistics", tags=["logistics"])
app.include_router(sample.router, prefix="/samples", tags=["samples"]) app.include_router(sample.router, prefix="/samples", tags=["samples"])
app.include_router(processing.router, prefix="/processing", tags=["processing"])
app.mount("/images", StaticFiles(directory="images"), name="images") app.mount("/images", StaticFiles(directory="images"), name="images")