mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2026-05-03 18:44:14 +02:00
feat: add HTTP REST signal
This commit is contained in:
@@ -0,0 +1,58 @@
|
||||
from typing import Any
|
||||
|
||||
from ophyd.utils.errors import ReadOnlyError
|
||||
from requests import Response, get, put
|
||||
|
||||
from ophyd_devices.utils.socket import SocketSignal
|
||||
|
||||
|
||||
class HttpRestError(Exception):
|
||||
"""Error for rest calls from a HttpRestSignal."""
|
||||
|
||||
def __init__(self, resp: Response, *args: object, value: Any | None = None) -> None:
|
||||
method, url = resp.request.method, resp.request.url
|
||||
data = f"{str(value)} to " if value is not None else ""
|
||||
super().__init__(
|
||||
f"Could not {method} {data}{url}. Code: {resp.status_code}. Reason: {resp.reason}.",
|
||||
*args,
|
||||
)
|
||||
|
||||
|
||||
class HttpRestSignal(SocketSignal):
|
||||
"""Ophyd signal which gets and puts to a REST API rather than EPICS PVs."""
|
||||
|
||||
def __init__(self, *args, get_uri: str = "", put_uri: str | None = None, **kwargs):
|
||||
self._get_uri = get_uri
|
||||
self._put_uri = put_uri or get_uri
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def _get_request_transform(self, uri: str):
|
||||
"""Hook to apply to the GET request before creating the request"""
|
||||
return get(uri)
|
||||
|
||||
def _put_request_transform(self, uri: str, val: Any):
|
||||
"""Hook to apply to the PUT request before creating the request"""
|
||||
return put(uri, val)
|
||||
|
||||
def _socket_get(self):
|
||||
resp = self._get_request_transform(self._get_uri)
|
||||
if not resp.ok:
|
||||
raise HttpRestError(resp)
|
||||
self._readback = resp.text
|
||||
return self._readback
|
||||
|
||||
def _socket_set(self, val: Any):
|
||||
resp = self._put_request_transform(self._put_uri, val)
|
||||
if not resp.ok:
|
||||
raise HttpRestError(resp, value=val)
|
||||
|
||||
|
||||
class HttpRestSignalRO(HttpRestSignal):
|
||||
"""Read-only version of HttpRestSignal"""
|
||||
|
||||
def __init__(self, *args, get_uri: str = "", **kwargs):
|
||||
self._get_uri = get_uri
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def _socket_set(self, val):
|
||||
raise ReadOnlyError(f"HttpRestSignalRO {self.name} is read-only!")
|
||||
@@ -85,7 +85,7 @@ class SocketSignal(abc.ABC, Signal):
|
||||
SUB_SETPOINT = "setpoint"
|
||||
|
||||
@abc.abstractmethod
|
||||
def _socket_get(self): ...
|
||||
def _socket_get(self) -> typing.Any: ...
|
||||
|
||||
@abc.abstractmethod
|
||||
def _socket_set(self, val): ...
|
||||
|
||||
@@ -37,6 +37,7 @@ dev = [
|
||||
"coverage~=7.0",
|
||||
"pylint~=3.0",
|
||||
"pytest-random-order~=1.1",
|
||||
"requests-mock",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -0,0 +1,113 @@
|
||||
from typing import Any
|
||||
from unittest.mock import ANY
|
||||
|
||||
import pytest
|
||||
import requests_mock
|
||||
from requests import Request, get, put
|
||||
|
||||
from ophyd_devices.utils.http_signal import HttpRestError, HttpRestSignal
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_server():
|
||||
with requests_mock.Mocker() as m:
|
||||
mock_data = "data"
|
||||
|
||||
def get_cb(request, context):
|
||||
nonlocal mock_data
|
||||
return mock_data
|
||||
|
||||
def put_cb(request, context):
|
||||
nonlocal mock_data
|
||||
mock_data = request.text
|
||||
|
||||
def query_param_put_cb(request: Request, context):
|
||||
nonlocal mock_data
|
||||
pos = request.qs.get("position")
|
||||
if len(pos) != 1:
|
||||
context.reason = "wrong number of params"
|
||||
context.status_code = 422
|
||||
return
|
||||
pos = pos[0]
|
||||
if pos_valid(pos):
|
||||
context.reason = ""
|
||||
context.status_code = 202
|
||||
mock_data = pos
|
||||
else:
|
||||
context.reason = "out of range"
|
||||
context.status_code = 422
|
||||
|
||||
def put_req_valid(request):
|
||||
return pos_valid(request.text)
|
||||
|
||||
def pos_valid(val):
|
||||
try:
|
||||
val = int(val)
|
||||
except:
|
||||
return False
|
||||
return -50 < val < 50
|
||||
|
||||
def put_can_fail_cb(request, context):
|
||||
|
||||
context.reason = "" if put_req_valid(request) else "out of range"
|
||||
context.status_code = 202 if put_req_valid(request) else 422
|
||||
|
||||
m.get("http://test.psi.ch/get_data", text=get_cb)
|
||||
m.put("http://test.psi.ch/put_data", text=put_cb)
|
||||
|
||||
m.get("http://test.psi.ch/bad_get_endpoint", status_code=404, reason="test not found")
|
||||
m.put("http://test.psi.ch/put_can_fail", text=put_can_fail_cb)
|
||||
|
||||
m.put("http://test.psi.ch/transform", text=query_param_put_cb)
|
||||
m.get("http://test.psi.ch/transform", text=get_cb)
|
||||
|
||||
yield requests_mock
|
||||
|
||||
|
||||
def test_signal_get():
|
||||
sig = HttpRestSignal(name="get", get_uri="http://test.psi.ch/get_data")
|
||||
assert sig.read() == {"get": {"timestamp": ANY, "value": "data"}}
|
||||
|
||||
|
||||
def test_signal_put():
|
||||
sig = HttpRestSignal(
|
||||
name="put_get", get_uri="http://test.psi.ch/get_data", put_uri="http://test.psi.ch/put_data"
|
||||
)
|
||||
assert sig.read() == {"put_get": {"timestamp": ANY, "value": "data"}}
|
||||
sig.put("test_value")
|
||||
assert sig.read() == {"put_get": {"timestamp": ANY, "value": "test_value"}}
|
||||
|
||||
|
||||
def test_bad_signal_get():
|
||||
sig = HttpRestSignal(name="get", get_uri="http://test.psi.ch/bad_get_endpoint")
|
||||
with pytest.raises(HttpRestError) as e:
|
||||
sig.read()
|
||||
assert e.match("test not found")
|
||||
|
||||
|
||||
def test_bad_signal_put():
|
||||
sig = HttpRestSignal(name="get", get_uri="http://test.psi.ch/put_can_fail")
|
||||
sig.put("20")
|
||||
|
||||
with pytest.raises(HttpRestError) as e:
|
||||
sig.put("51")
|
||||
assert e.match("Could not PUT 51")
|
||||
assert e.match("Code: 422. Reason: out of range.")
|
||||
|
||||
|
||||
class PutQueryParamsSignal(HttpRestSignal):
|
||||
def _get_request_transform(self, uri: str):
|
||||
return get(uri + "transform")
|
||||
|
||||
def _put_request_transform(self, uri: str, val: Any, **kwargs):
|
||||
return put(uri + "transform", params={"position": val})
|
||||
|
||||
|
||||
def test_put_args_in_params():
|
||||
sig = PutQueryParamsSignal(name="transformed", get_uri="http://test.psi.ch/")
|
||||
reading = sig.read()
|
||||
assert reading.get("transformed").get("value") == "data"
|
||||
|
||||
sig.put("20")
|
||||
reading = sig.read()
|
||||
assert reading.get("transformed").get("value") == "20"
|
||||
Reference in New Issue
Block a user