
Introduced `ImageCreate` and `Image` models to handle image-related data in the backend. Improved the organization and readability of the testing notebook by consolidating and formatting code into distinct sections with markdown cells.
168 lines
5.3 KiB
Python
168 lines
5.3 KiB
Python
from fastapi import APIRouter, HTTPException, Depends, UploadFile, File, Form
|
|
from sqlalchemy.orm import Session
|
|
from pathlib import Path
|
|
from typing import List
|
|
from datetime import datetime
|
|
import shutil
|
|
from app.schemas import (
|
|
Puck as PuckSchema,
|
|
Sample as SampleSchema,
|
|
SampleEventCreate,
|
|
Sample,
|
|
Image,
|
|
ImageCreate,
|
|
)
|
|
from app.models import (
|
|
Puck as PuckModel,
|
|
Sample as SampleModel,
|
|
SampleEvent as SampleEventModel,
|
|
Image as ImageModel,
|
|
)
|
|
from app.dependencies import get_db
|
|
import logging
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/{puck_id}/samples", response_model=List[SampleSchema])
|
|
async def get_samples_with_events(puck_id: int, db: Session = Depends(get_db)):
|
|
puck = db.query(PuckModel).filter(PuckModel.id == puck_id).first()
|
|
if not puck:
|
|
raise HTTPException(status_code=404, detail="Puck not found")
|
|
|
|
samples = db.query(SampleModel).filter(SampleModel.puck_id == puck_id).all()
|
|
|
|
for sample in samples:
|
|
sample.events = (
|
|
db.query(SampleEventModel)
|
|
.filter(SampleEventModel.sample_id == sample.id)
|
|
.all()
|
|
)
|
|
|
|
return samples
|
|
|
|
|
|
@router.get("/pucks-samples", response_model=List[PuckSchema])
|
|
async def get_all_pucks_with_samples_and_events(db: Session = Depends(get_db)):
|
|
logging.info("Fetching all pucks with samples and events")
|
|
|
|
pucks = (
|
|
db.query(PuckModel)
|
|
.options(
|
|
joinedload(PuckModel.samples).joinedload(
|
|
SampleModel.events
|
|
), # Correct nested relationship
|
|
joinedload(PuckModel.events), # If Puck has its own events relationship
|
|
)
|
|
.all()
|
|
)
|
|
|
|
if not pucks:
|
|
raise HTTPException(status_code=404, detail="No pucks found in the database")
|
|
|
|
return pucks
|
|
|
|
|
|
# Route to post a new sample event
|
|
@router.post("/samples/{sample_id}/events", response_model=Sample)
|
|
async def create_sample_event(
|
|
sample_id: int, event: SampleEventCreate, db: Session = Depends(get_db)
|
|
):
|
|
# Ensure the sample exists
|
|
sample = db.query(SampleModel).filter(SampleModel.id == sample_id).first()
|
|
if not sample:
|
|
raise HTTPException(status_code=404, detail="Sample not found")
|
|
|
|
# Create the event
|
|
sample_event = SampleEventModel(
|
|
sample_id=sample_id,
|
|
event_type=event.event_type,
|
|
timestamp=datetime.now(), # Use the current timestamp
|
|
)
|
|
db.add(sample_event)
|
|
db.commit()
|
|
db.refresh(sample_event)
|
|
|
|
# Load events for the sample to be serialized in the response
|
|
sample.events = (
|
|
db.query(SampleEventModel).filter(SampleEventModel.sample_id == sample_id).all()
|
|
)
|
|
|
|
return sample # Return the sample, now including `mount_count`
|
|
|
|
|
|
@router.post("/{sample_id}/upload-images", response_model=Image)
|
|
async def upload_sample_image(
|
|
sample_id: int,
|
|
uploaded_file: UploadFile = File(...),
|
|
comment: str = Form(None),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
logging.info(f"Received file: {uploaded_file.filename}")
|
|
|
|
# Validate Sample
|
|
sample = db.query(SampleModel).filter(SampleModel.id == sample_id).first()
|
|
if not sample:
|
|
raise HTTPException(status_code=404, detail="Sample not found")
|
|
|
|
# Define Directory Structure
|
|
pgroup = sample.puck.dewar.pgroups # adjust to sample or puck pgroups as needed
|
|
today = datetime.now().strftime("%Y-%m-%d")
|
|
dewar_name = (
|
|
sample.puck.dewar.dewar_name
|
|
if sample.puck and sample.puck.dewar
|
|
else "default_dewar"
|
|
)
|
|
puck_name = sample.puck.puck_name if sample.puck else "default_puck"
|
|
position = sample.position if sample.position else "default_position"
|
|
base_dir = Path(f"images/{pgroup}/{today}/{dewar_name}/{puck_name}/{position}")
|
|
base_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Validate MIME type and Save the File
|
|
if not uploaded_file.content_type.startswith("image/"):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid file type: {uploaded_file.filename}."
|
|
f" Only images are accepted.",
|
|
)
|
|
|
|
file_path = base_dir / uploaded_file.filename
|
|
logging.debug(f"Saving file {uploaded_file.filename} to {file_path}")
|
|
|
|
try:
|
|
with file_path.open("wb") as buffer:
|
|
shutil.copyfileobj(uploaded_file.file, buffer)
|
|
logging.info(f"File saved: {file_path}")
|
|
except Exception as e:
|
|
logging.error(f"Error saving file {uploaded_file.filename}: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Could not save file {uploaded_file.filename}."
|
|
f" Ensure the server has correct permissions.",
|
|
)
|
|
|
|
# Create the payload from the Pydantic schema
|
|
image_payload = ImageCreate(
|
|
pgroup=pgroup,
|
|
comment=comment,
|
|
filepath=str(file_path),
|
|
status="active",
|
|
sample_id=sample_id,
|
|
).dict()
|
|
|
|
# Convert the payload to your mapped SQLAlchemy model instance.
|
|
# Make sure that ImageModel is your mapped model for images.
|
|
new_image = ImageModel(**image_payload)
|
|
db.add(new_image)
|
|
db.commit()
|
|
db.refresh(new_image)
|
|
|
|
logging.info(
|
|
f"Uploaded 1 file for sample {sample_id} and"
|
|
f" added record {new_image.id} to the database."
|
|
)
|
|
# Returning the mapped SQLAlchemy object, which will be converted to the
|
|
# Pydantic response model.
|
|
return new_image
|