Files
stand/stand/api.py
T
2026-06-06 12:00:15 +02:00

57 lines
1.7 KiB
Python

from datetime import datetime
from http import HTTPStatus
from typing import Any
from fastapi import APIRouter, HTTPException, Response
from state import grids, adb
from utils.annotations import Beamline, PGroup
router = APIRouter()
@router.get("/pgroups/{pgroup}")
def get_pgroup(beamline: Beamline, pgroup: PGroup, orient: str = None):
content = adb.get(beamline).get_data(pgroup).to_json(orient=orient)
return Response(content=content, media_type="application/json")
@router.delete("/pgroups/{pgroup}/runs")
def clear_pgroup(beamline: Beamline, pgroup: PGroup):
adb.get(beamline).clear_data(pgroup)
for grid in grids[pgroup]:
grid.clear()
@router.put("/pgroups/{pgroup}/runs/{run}/entries/{entry}")
def set_entry(beamline: Beamline, pgroup: PGroup, run: int, entry: str, value: Any):
adb.get(beamline).set_cell(pgroup, run, entry, value)
for grid in grids[pgroup]:
grid.set_cell(run, entry, value)
@router.post("/pgroups/{pgroup}/runs/{run}")
def append_run(beamline: Beamline, pgroup: PGroup, run: int, entries: dict[str, Any]):
lib = adb.get(beamline)
run_exists = lib.has_index(pgroup, run)
if run_exists:
raise HTTPException(HTTPStatus.CONFLICT, f"run {run} exists already in {beamline}/{pgroup}")
timestamp = entries.pop("timestamp", None)
timestamp = datetime.fromisoformat(timestamp) if timestamp else datetime.now()
entries = {"timestamp": timestamp, **entries} # setdefault would not force timestamp to be the first column
lib.append_row(pgroup, run, entries)
entries = {"run": run, **entries} # setdefault would not force run to be the first column
for grid in grids[pgroup]:
grid.append(entries)