feat: add aerotech device
CI for pxii_bec / test (push) Successful in 33s

This commit is contained in:
2026-05-06 18:07:28 +02:00
parent 3df80d699a
commit fc6c7d95c1
2 changed files with 101 additions and 0 deletions
+43
View File
@@ -0,0 +1,43 @@
from ophyd import Component as Cpt
from .http import TIMESTAMP_ID, HttpDeviceController, HttpDeviceSignal, HttpOphydDevice
class AerotechController(HttpDeviceController):
_readback_endpoint = "status"
_target_endpoint = "position"
def __init__(self, *, prefix, **kwargs):
self._readbacks: dict[str, dict[str, float | bool]] = {}
super().__init__(prefix=prefix, **kwargs)
def put(self, axis: str, val: float):
self._rest_post(body={axis: val})
def get_readback(self, axis_id: str) -> tuple[float, float] | None:
with self._readback_lock:
if axis_id not in self._readbacks or TIMESTAMP_ID not in self._readbacks:
return None
return self._readbacks.get(axis_id)["pos"], self._readbacks.get(TIMESTAMP_ID) # type: ignore
class Aerotech(HttpOphydDevice):
controller_class = AerotechController
x = Cpt(HttpDeviceSignal, axis_identifier="x", tolerance=0.01)
y = Cpt(HttpDeviceSignal, axis_identifier="y", tolerance=0.01)
z = Cpt(HttpDeviceSignal, axis_identifier="z", tolerance=0.01)
u = Cpt(HttpDeviceSignal, axis_identifier="u", tolerance=0.01)
vel_u_deg_s = Cpt(HttpDeviceSignal, axis_identifier="vel_u_deg_s", tolerance=0.01)
def _test():
a = Aerotech(name="aerotech", prefix="http://mx-x10sa-queue-01:5234")
a.wait_for_connection()
return a
if __name__ == "__main__":
aerotech = _test()
print(aerotech.read())
aerotech.stop()
+58
View File
@@ -0,0 +1,58 @@
from copy import copy
from threading import RLock
from unittest.mock import ANY
import pytest
class MockServer:
def __init__(self) -> None:
self.lock = RLock()
self.mock_data = {
"x": {"pos": 1.0},
"y": {"pos": 1.0},
"z": {"pos": 1.0},
"u": {"pos": 1.0},
"vel_u_deg_s": {"pos": 1.0},
}
def get(self, endpoint):
with self.lock:
return copy(self.mock_data)
def put(self, params: dict | None = None, body: dict | None = None):
with self.lock:
assert body is not None
for k, v in body.items():
self.mock_data[k]["pos"] = v
@pytest.fixture
def aerotech():
mock_server = MockServer()
from pxii_bec.devices.aerotech import Aerotech
s = Aerotech(name="aerotech", prefix="http://test-aerotech.psi.ch")
s.controller._rest_get = mock_server.get
s.controller._rest_post = mock_server.put
yield s
s.controller._stop_monitor_readback_event.set()
class TestAerotech:
def test_aerotech_read(self, aerotech):
aerotech.wait_for_connection()
reading = aerotech.read()
assert dict(reading) == {
"aerotech_x": {"value": 1.0, "timestamp": ANY},
"aerotech_y": {"value": 1.0, "timestamp": ANY},
"aerotech_z": {"value": 1.0, "timestamp": ANY},
"aerotech_u": {"value": 1.0, "timestamp": ANY},
"aerotech_vel_u_deg_s": {"value": 1.0, "timestamp": ANY},
}
def test_aerotech_set_with_status(self, aerotech):
aerotech.wait_for_connection()
st = aerotech.x.set(5.0)
st.wait(timeout=1)
assert aerotech.x.get() == 5.0