From fc6c7d95c17a16ea17311b811efd092f99915bf0 Mon Sep 17 00:00:00 2001 From: perl_d Date: Wed, 6 May 2026 18:07:28 +0200 Subject: [PATCH] feat: add aerotech device --- pxii_bec/devices/aerotech.py | 43 +++++++++++++++++++++ tests/tests_devices/test_aerotech.py | 58 ++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+) create mode 100644 pxii_bec/devices/aerotech.py create mode 100644 tests/tests_devices/test_aerotech.py diff --git a/pxii_bec/devices/aerotech.py b/pxii_bec/devices/aerotech.py new file mode 100644 index 0000000..d465e59 --- /dev/null +++ b/pxii_bec/devices/aerotech.py @@ -0,0 +1,43 @@ +from ophyd import Component as Cpt + +from .http import TIMESTAMP_ID, HttpDeviceController, HttpDeviceSignal, HttpOphydDevice + + +class AerotechController(HttpDeviceController): + _readback_endpoint = "status" + _target_endpoint = "position" + + def __init__(self, *, prefix, **kwargs): + self._readbacks: dict[str, dict[str, float | bool]] = {} + super().__init__(prefix=prefix, **kwargs) + + def put(self, axis: str, val: float): + self._rest_post(body={axis: val}) + + def get_readback(self, axis_id: str) -> tuple[float, float] | None: + with self._readback_lock: + if axis_id not in self._readbacks or TIMESTAMP_ID not in self._readbacks: + return None + return self._readbacks.get(axis_id)["pos"], self._readbacks.get(TIMESTAMP_ID) # type: ignore + + +class Aerotech(HttpOphydDevice): + controller_class = AerotechController + + x = Cpt(HttpDeviceSignal, axis_identifier="x", tolerance=0.01) + y = Cpt(HttpDeviceSignal, axis_identifier="y", tolerance=0.01) + z = Cpt(HttpDeviceSignal, axis_identifier="z", tolerance=0.01) + u = Cpt(HttpDeviceSignal, axis_identifier="u", tolerance=0.01) + vel_u_deg_s = Cpt(HttpDeviceSignal, axis_identifier="vel_u_deg_s", tolerance=0.01) + + +def _test(): + a = Aerotech(name="aerotech", prefix="http://mx-x10sa-queue-01:5234") + a.wait_for_connection() + return a + + +if __name__ == "__main__": + aerotech = _test() + print(aerotech.read()) + aerotech.stop() diff --git a/tests/tests_devices/test_aerotech.py b/tests/tests_devices/test_aerotech.py new file mode 100644 index 0000000..49e2fc2 --- /dev/null +++ b/tests/tests_devices/test_aerotech.py @@ -0,0 +1,58 @@ +from copy import copy +from threading import RLock +from unittest.mock import ANY + +import pytest + + +class MockServer: + def __init__(self) -> None: + self.lock = RLock() + self.mock_data = { + "x": {"pos": 1.0}, + "y": {"pos": 1.0}, + "z": {"pos": 1.0}, + "u": {"pos": 1.0}, + "vel_u_deg_s": {"pos": 1.0}, + } + + def get(self, endpoint): + with self.lock: + return copy(self.mock_data) + + def put(self, params: dict | None = None, body: dict | None = None): + with self.lock: + assert body is not None + for k, v in body.items(): + self.mock_data[k]["pos"] = v + + +@pytest.fixture +def aerotech(): + mock_server = MockServer() + from pxii_bec.devices.aerotech import Aerotech + + s = Aerotech(name="aerotech", prefix="http://test-aerotech.psi.ch") + s.controller._rest_get = mock_server.get + s.controller._rest_post = mock_server.put + yield s + s.controller._stop_monitor_readback_event.set() + + +class TestAerotech: + def test_aerotech_read(self, aerotech): + aerotech.wait_for_connection() + reading = aerotech.read() + assert dict(reading) == { + "aerotech_x": {"value": 1.0, "timestamp": ANY}, + "aerotech_y": {"value": 1.0, "timestamp": ANY}, + "aerotech_z": {"value": 1.0, "timestamp": ANY}, + "aerotech_u": {"value": 1.0, "timestamp": ANY}, + "aerotech_vel_u_deg_s": {"value": 1.0, "timestamp": ANY}, + } + + def test_aerotech_set_with_status(self, aerotech): + aerotech.wait_for_connection() + st = aerotech.x.set(5.0) + st.wait(timeout=1) + assert aerotech.x.get() == 5.0