fix(controller): add configurable timeout, en/disable controller axes on on/off

This commit is contained in:
2025-11-25 09:34:06 +01:00
committed by Christian Appel
parent 69f7a353cf
commit 2fb64e995e
6 changed files with 176 additions and 46 deletions

View File

@@ -3,8 +3,13 @@ from unittest import mock
from ophyd_devices.utils.controller import Controller
def test_controller_off():
controller = Controller(socket_cls=mock.MagicMock(), socket_host="dummy", socket_port=123)
def test_controller_off(dm_with_devices):
controller = Controller(
socket_cls=mock.MagicMock(),
socket_host="dummy",
socket_port=123,
device_manager=dm_with_devices,
)
controller.on()
with mock.patch.object(controller.sock, "close") as mock_close:
controller.off()
@@ -17,10 +22,12 @@ def test_controller_off():
controller._reset_controller()
def test_controller_on():
def test_controller_on(dm_with_devices):
socket_cls = mock.MagicMock()
Controller._controller_instances = {}
controller = Controller(socket_cls=socket_cls, socket_host="dummy", socket_port=123)
controller = Controller(
socket_cls=socket_cls, socket_host="dummy", socket_port=123, device_manager=dm_with_devices
)
controller.on()
assert controller.sock is not None
assert controller.connected is True
@@ -30,3 +37,39 @@ def test_controller_on():
controller.on()
socket_cls().open.assert_called_once()
controller._reset_controller()
def test_controller_with_multiple_axes(dm_with_devices):
"""Test that turning the controller on and off enables/disables all axes attached to it."""
socket_cls = mock.MagicMock()
Controller._controller_instances = {}
Controller._axes_per_controller = 2
controller = Controller(
socket_cls=socket_cls, socket_host="dummy", socket_port=123, device_manager=dm_with_devices
)
with mock.patch.object(controller.dm, "config_helper") as mock_config_helper:
# Disable samx, samy first
dm_with_devices.devices.get("samx").enabled = False
dm_with_devices.devices.get("samy").enabled = False
# Set axes on the controller
controller.set_axis(axis=dm_with_devices.devices["samx"], axis_nr=0)
controller.set_axis(axis=dm_with_devices.devices["samy"], axis_nr=1)
# Turn the controller on, should turn the controller on, but not enable the axes
controller.on()
assert dm_with_devices.devices.get("samx").enabled is False
assert dm_with_devices.devices.get("samy").enabled is False
assert controller.connected is True
controller.set_all_devices_enable(True)
assert dm_with_devices.devices.get("samx").enabled is True
assert dm_with_devices.devices.get("samy").enabled is True
# Disable one axis after another, the last one should turn the controller off
controller.set_device_enable("samx", False)
assert controller.connected is True
assert dm_with_devices.devices.get("samx").enabled is False
assert dm_with_devices.devices.get("samy").enabled is True
controller.set_device_enable("samy", False)
assert dm_with_devices.devices.get("samy").enabled is False
# Enabling one axis should turn the controller back on
assert controller.connected is False
controller.set_device_enable("samx", True)
assert controller.connected is True

View File

@@ -1,4 +1,7 @@
import socket
from unittest import mock
import pytest
from ophyd_devices.utils.socket import SocketIO
@@ -62,6 +65,21 @@ def test_open():
assert socketio.sock.port == socketio.port
def test_socket_open_with_timeout():
dsocket = DummySocket()
socketio = SocketIO("localhost", 8080)
socketio.sock = dsocket
with mock.patch.object(dsocket, "connect") as mock_connect:
socketio.open(timeout=0.1)
mock_connect.assert_called_once()
mock_connect.reset_mock()
# There is a 1s sleep in the retry loop, mock_connect should be called only once
mock_connect.side_effect = Exception("Connection failed")
with pytest.raises(ConnectionError):
socketio.open(timeout=0.4)
mock_connect.assert_called_once()
def test_close():
socketio = SocketIO("localhost", 8080)
socketio.close()