mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2026-04-21 13:24:36 +02:00
214 lines
7.0 KiB
Python
214 lines
7.0 KiB
Python
import socket
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
from bec_server.device_server.tests.utils import DMMock
|
|
|
|
from ophyd_devices.tests.utils import SocketMock
|
|
from ophyd_devices.utils.controller import Controller
|
|
from ophyd_devices.utils.socket import SocketIO, SocketSignal
|
|
|
|
|
|
class DummySocketSignal(SocketSignal):
|
|
"""Dummy SocketSignal class for testing the SocketSignal interface."""
|
|
|
|
def __init__(self, name, controller: Controller, **kwargs):
|
|
super().__init__(name=name, **kwargs)
|
|
self.controller = controller
|
|
|
|
def _socket_get(self) -> str:
|
|
return self.controller.socket_put_and_receive("get")
|
|
|
|
def _socket_set(self, value: str):
|
|
self.controller.socket_put_and_receive(value)
|
|
|
|
|
|
@pytest.fixture
|
|
def controller():
|
|
"""Controller fixture for testing the SocketSignal interface."""
|
|
try:
|
|
dm = DMMock()
|
|
Controller._reset_controller()
|
|
controller = Controller(
|
|
name="controller",
|
|
socket_cls=SocketMock,
|
|
socket_host="localhost",
|
|
socket_port=8080,
|
|
device_manager=dm,
|
|
)
|
|
controller.on()
|
|
yield controller
|
|
finally:
|
|
Controller._reset_controller()
|
|
|
|
|
|
@pytest.fixture
|
|
def signal(controller):
|
|
"""Dummy SocketSignal fixture for testing."""
|
|
return DummySocketSignal(name="signal", auto_monitor=True, controller=controller)
|
|
|
|
|
|
class DummySocket:
|
|
AF_INET = 2
|
|
SOCK_STREAM = 1
|
|
|
|
def __init__(self) -> None:
|
|
self.address_family = None
|
|
self.socket_kind = None
|
|
self.timeout = None
|
|
|
|
def socket(self, address_family, socket_kind):
|
|
self.address_family = address_family
|
|
self.socket_kind = socket_kind
|
|
return self
|
|
|
|
def settimeout(self, timeout):
|
|
self.timeout = timeout
|
|
|
|
def send(self, msg, *args, **kwargs):
|
|
self.send_buffer = msg
|
|
|
|
def connect(self, address):
|
|
self.host = address[0]
|
|
self.port = address[1]
|
|
self.connected = True
|
|
|
|
def close(self):
|
|
self.connected = False
|
|
|
|
|
|
def test_socket_init():
|
|
socketio = SocketIO("localhost", 8080)
|
|
|
|
assert socketio.host == "localhost"
|
|
assert socketio.port == 8080
|
|
|
|
assert socketio.is_open == False
|
|
|
|
assert socketio.sock.family == socket.AF_INET
|
|
assert socketio.sock.type == socket.SOCK_STREAM
|
|
|
|
|
|
def test_socket_put():
|
|
dsocket = DummySocket()
|
|
socketio = SocketIO("localhost", 8080)
|
|
socketio.sock = dsocket
|
|
socketio.put(b"message")
|
|
assert dsocket.send_buffer == b"message"
|
|
|
|
|
|
def test_open():
|
|
dsocket = DummySocket()
|
|
socketio = SocketIO("localhost", 8080)
|
|
socketio.sock = dsocket
|
|
socketio.open()
|
|
assert socketio.is_open == True
|
|
assert socketio.sock.host == socketio.host
|
|
assert socketio.sock.port == socketio.port
|
|
|
|
|
|
def test_socket_open_with_timeout():
|
|
dsocket = DummySocket()
|
|
socketio = SocketIO("localhost", 8080)
|
|
socketio.sock = dsocket
|
|
with mock.patch.object(dsocket, "connect") as mock_connect:
|
|
socketio.open(timeout=0.1)
|
|
mock_connect.assert_called_once()
|
|
mock_connect.reset_mock()
|
|
# There is a 1s sleep in the retry loop, mock_connect should be called only once
|
|
mock_connect.side_effect = Exception("Connection failed")
|
|
with pytest.raises(ConnectionError):
|
|
socketio.open(timeout=0.4)
|
|
mock_connect.assert_called_once()
|
|
|
|
|
|
def test_close():
|
|
socketio = SocketIO("localhost", 8080)
|
|
socketio.close()
|
|
assert socketio.sock == None
|
|
assert socketio.is_open == False
|
|
|
|
|
|
def test_socket_signal_get(signal):
|
|
"""
|
|
Test that the callback mechanism of SocketSignal correctly handles recursive reads without
|
|
causing multiple socket reads, and that the value is correctly cached and passed to the callback.
|
|
"""
|
|
controller = signal.controller
|
|
assert signal._auto_monitor == True
|
|
|
|
controller.sock: SocketMock
|
|
controller.sock.buffer_recv = [b"value2", b"value1"]
|
|
|
|
callback_read_buffer = []
|
|
callback_value_buffer = []
|
|
|
|
readback = signal.read()
|
|
assert readback[signal.name]["value"] == "value2"
|
|
|
|
def _test_cb(value, old_value, **kwargs):
|
|
"""Simulate a callback that triggers another read."""
|
|
signal = kwargs["obj"]
|
|
callback_value_buffer.append((value, old_value))
|
|
readback = signal.read()
|
|
callback_read_buffer.append(readback)
|
|
|
|
signal.subscribe(_test_cb, event_type=signal.SUB_VALUE, run=False)
|
|
|
|
readback = signal.read()
|
|
assert len(callback_read_buffer) == 1, "Callback should have been called once"
|
|
assert len(callback_value_buffer) == 1, "Callback should have been called once"
|
|
|
|
assert readback == callback_read_buffer[0]
|
|
assert callback_value_buffer == [("value1", "value2")]
|
|
|
|
|
|
def test_socket_signal_put(signal):
|
|
"""
|
|
Test that the put method of the SocketSignal class correctly sends values to the socket,
|
|
and that it implements the necessary subscription notifications for value changes.
|
|
"""
|
|
controller = signal.controller
|
|
controller.sock: SocketMock
|
|
|
|
controller.sock.buffer_recv = [b"value2", b"new_value", b"new_value"]
|
|
|
|
callback_read_buffer = []
|
|
callback_value_buffer = []
|
|
|
|
readback = signal.read()
|
|
assert readback[signal.name]["value"] == "value2"
|
|
|
|
def _test_readback_cb(value, old_value, **kwargs):
|
|
"""Simulate a callback that triggers another read."""
|
|
signal = kwargs["obj"]
|
|
callback_value_buffer.append((value, old_value))
|
|
readback = signal.read()
|
|
callback_read_buffer.append(readback)
|
|
|
|
callback_setpoint_buffer = []
|
|
|
|
def _test_setpoint_cb(value, old_value, **kwargs):
|
|
"""Simulate a callback that runs a read, this should trigger another read on the socket."""
|
|
signal = kwargs["obj"]
|
|
callback_setpoint_buffer.append((value, old_value))
|
|
signal.read()
|
|
|
|
signal.subscribe(_test_setpoint_cb, event_type=signal.SUB_SETPOINT, run=False)
|
|
signal.subscribe(_test_readback_cb, event_type=signal.SUB_VALUE, run=False)
|
|
|
|
# Now we run 'put'. This runs super().put(...), which triggers the SUB_VALUE callback first,
|
|
# and then the SUB_SETPOINT callback. With our extra callback on SUB_VALUE, we will trigger
|
|
# one _test_readback_cb callback through the super().put(...) call with sub_type SUB_VALUE,
|
|
# and another _test_readback_cb callback through the _test_setpoint_cb callback with sub_type SUB_SETPOINT.
|
|
# Respectively the signal.read() call in test_setpoint_cb.
|
|
signal.put("new_value")
|
|
assert controller.sock.buffer_put == [b"get\n", b"new_value\n", b"get\n"]
|
|
assert len(callback_setpoint_buffer) == 1, "Setpoint callback should have been called once"
|
|
assert len(callback_read_buffer) == 2, "Readback callback should have been called twice"
|
|
assert len(callback_value_buffer) == 2, "Value callback should have been called twice"
|
|
assert callback_setpoint_buffer == [("new_value", "value2")]
|
|
assert callback_read_buffer[0][signal.name]["value"] == "new_value"
|
|
assert callback_read_buffer[1][signal.name]["value"] == "new_value"
|
|
assert callback_value_buffer == [("new_value", "value2"), ("new_value", "new_value")]
|