Refactor OpenAPI fetcher for improved clarity and robustness
Reorganized and enhanced the OpenAPI fetch logic for better maintainability and error handling. Key updates include improved environment variable validation, more detailed error messages, streamlined configuration loading, and additional safety checks for file paths and directories. Added proper logging and ensured the process flow is easy to trace.
This commit is contained in:
@ -1,7 +1,13 @@
|
||||
from fastapi import APIRouter, HTTPException, Depends
|
||||
from sqlalchemy.orm import Session
|
||||
from typing import List
|
||||
from app.schemas import Puck as PuckSchema, Sample as SampleSchema
|
||||
from datetime import datetime
|
||||
from app.schemas import (
|
||||
Puck as PuckSchema,
|
||||
Sample as SampleSchema,
|
||||
SampleEventResponse,
|
||||
SampleEventCreate,
|
||||
)
|
||||
from app.models import (
|
||||
Puck as PuckModel,
|
||||
Sample as SampleModel,
|
||||
@ -45,3 +51,50 @@ async def get_all_pucks_with_samples_and_events(db: Session = Depends(get_db)):
|
||||
status_code=404, detail="No pucks found in the database"
|
||||
) # More descriptive
|
||||
return pucks
|
||||
|
||||
|
||||
# Route to post a new sample event
|
||||
@router.post("/samples/{sample_id}/events", response_model=SampleEventResponse)
|
||||
async def create_sample_event(
|
||||
sample_id: int, event: SampleEventCreate, db: Session = Depends(get_db)
|
||||
):
|
||||
# Ensure the sample exists
|
||||
sample = db.query(SampleModel).filter(SampleModel.id == sample_id).first()
|
||||
if not sample:
|
||||
raise HTTPException(status_code=404, detail="Sample not found")
|
||||
|
||||
# Create the event
|
||||
sample_event = SampleEventModel(
|
||||
sample_id=sample_id,
|
||||
event_type=event.event_type,
|
||||
timestamp=datetime.now(), # Use the current timestamp
|
||||
)
|
||||
db.add(sample_event)
|
||||
db.commit()
|
||||
db.refresh(sample_event)
|
||||
|
||||
return (
|
||||
sample_event # Response will automatically use the SampleEventResponse schema
|
||||
)
|
||||
|
||||
|
||||
# Route to fetch the last (most recent) sample event
|
||||
@router.get("/samples/{sample_id}/events/last", response_model=SampleEventResponse)
|
||||
async def get_last_sample_event(sample_id: int, db: Session = Depends(get_db)):
|
||||
# Ensure the sample exists
|
||||
sample = db.query(SampleModel).filter(SampleModel.id == sample_id).first()
|
||||
if not sample:
|
||||
raise HTTPException(status_code=404, detail="Sample not found")
|
||||
|
||||
# Get the most recent event for the sample
|
||||
last_event = (
|
||||
db.query(SampleEventModel)
|
||||
.filter(SampleEventModel.sample_id == sample_id)
|
||||
.order_by(SampleEventModel.timestamp.desc())
|
||||
.first()
|
||||
)
|
||||
|
||||
if not last_event:
|
||||
raise HTTPException(status_code=404, detail="No events found for the sample")
|
||||
|
||||
return last_event # Response will automatically use the SampleEventResponse schema
|
||||
|
@ -83,6 +83,16 @@ class SampleEventCreate(BaseModel):
|
||||
event_type: str
|
||||
|
||||
|
||||
class SampleEventResponse(BaseModel):
|
||||
id: int
|
||||
sample_id: int
|
||||
event_type: str
|
||||
timestamp: datetime
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
|
||||
class Results(BaseModel):
|
||||
# Define attributes for Results here
|
||||
pass
|
||||
|
Reference in New Issue
Block a user