feat: implement put method for aerotech controller
CI for pxii_bec / test (push) Failing after 2m27s
CI for pxii_bec / test (push) Failing after 2m27s
This commit is contained in:
@@ -1,14 +1,19 @@
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd_devices import PSIDeviceBase
|
||||
|
||||
from .http import TIMESTAMP_ID, HttpDeviceController, HttpDeviceSignal, HttpOphydDevice
|
||||
|
||||
|
||||
class AerotechController(HttpDeviceController):
|
||||
_readback_endpoint = "status"
|
||||
_target_endpoint = "position"
|
||||
|
||||
def __init__(self, *, prefix, **kwargs):
|
||||
self._readbacks: dict[str, dict[str, float | bool]] = {}
|
||||
super().__init__(prefix=prefix, **kwargs)
|
||||
|
||||
def put(self, axis: str, val: float):
|
||||
self._rest_post(body={axis: val})
|
||||
|
||||
def get_readback(self, axis_id: str) -> tuple[float, float] | None:
|
||||
with self._readback_lock:
|
||||
if axis_id not in self._readbacks or TIMESTAMP_ID not in self._readbacks:
|
||||
@@ -23,6 +28,7 @@ class Aerotech(HttpOphydDevice):
|
||||
y = Cpt(HttpDeviceSignal, axis_identifier="y", tolerance=0.01)
|
||||
z = Cpt(HttpDeviceSignal, axis_identifier="z", tolerance=0.01)
|
||||
u = Cpt(HttpDeviceSignal, axis_identifier="u", tolerance=0.01)
|
||||
vel_u_deg_s = Cpt(HttpDeviceSignal, axis_identifier="vel_u_deg_s", tolerance=0.01)
|
||||
|
||||
|
||||
def _test():
|
||||
|
||||
@@ -86,10 +86,15 @@ class HttpDeviceController(OphydObject, ABC):
|
||||
raise HttpRestError(resp)
|
||||
return resp.json()
|
||||
|
||||
def _rest_put(self, val: dict[str, float]):
|
||||
resp = self._session.put(self._prefix + self._target_endpoint, params=val)
|
||||
def _rest_put(self, params: dict | None = None, body: dict | None = None):
|
||||
resp = self._session.put(self._prefix + self._target_endpoint, params=params)
|
||||
if not resp.ok:
|
||||
raise HttpRestError(resp, value=val)
|
||||
raise HttpRestError(resp, value=params)
|
||||
|
||||
def _rest_post(self, params: dict | None = None, body: dict | None = None):
|
||||
resp = self._session.post(self._prefix + self._target_endpoint, params=params, data=body)
|
||||
if not resp.ok:
|
||||
raise HttpRestError(resp, value=params)
|
||||
|
||||
def start_monitor(self):
|
||||
"""Start or restart the automonitor thread."""
|
||||
|
||||
@@ -24,12 +24,12 @@ class SmargonController(HttpDeviceController):
|
||||
return self._readbacks.get(axis_id), self._readbacks.get(_TIMESTAMP_ID) # type: ignore
|
||||
|
||||
def put(self, axis: str, val: float):
|
||||
self._rest_put({axis: val})
|
||||
self._rest_put(params={axis: val})
|
||||
|
||||
def stop(self):
|
||||
# There doesn't appear to be a stop endpoint on the server
|
||||
# Best effort: set the target to the current position
|
||||
self._rest_put(self._readbacks)
|
||||
self._rest_put(params=self._readbacks)
|
||||
|
||||
|
||||
class Smargon(PSIDeviceBase):
|
||||
|
||||
Reference in New Issue
Block a user