Files
ophyd_devices/tests/test_socket.py

194 lines
5.9 KiB
Python

import socket
import time
from unittest import mock
import pytest
from bec_server.device_server.tests.utils import DMMock
from ophyd_devices.tests.utils import SocketMock
from ophyd_devices.utils.controller import Controller
from ophyd_devices.utils.socket import SocketIO, SocketSignal
class DummySocketSignal(SocketSignal):
"""Dummy SocketSignal class for testing the SocketSignal interface."""
def __init__(
self, name, controller: Controller, notify_bec=True, readback_timeout=None, **kwargs
):
super().__init__(
name=name, notify_bec=notify_bec, readback_timeout=readback_timeout, **kwargs
)
self.controller = controller
def _socket_get(self) -> str:
self._metadata["timestamp"] = time.monotonic()
return self.controller.socket_put_and_receive("get")
def _socket_set(self, value: str):
self.controller.socket_put_and_receive(value)
@pytest.fixture
def controller():
try:
dm = DMMock()
Controller._reset_controller()
controller = Controller(
name="controller",
socket_cls=SocketMock,
socket_host="localhost",
socket_port=8080,
device_manager=dm,
)
controller.on()
return controller
finally:
Controller._reset_controller()
@pytest.fixture
def signal(controller):
return DummySocketSignal(name="signal", controller=controller, readback_timeout=0.1)
class DummySocket:
AF_INET = 2
SOCK_STREAM = 1
def __init__(self) -> None:
self.address_family = None
self.socket_kind = None
self.timeout = None
def socket(self, address_family, socket_kind):
self.address_family = address_family
self.socket_kind = socket_kind
return self
def settimeout(self, timeout):
self.timeout = timeout
def send(self, msg, *args, **kwargs):
self.send_buffer = msg
def connect(self, address):
self.host = address[0]
self.port = address[1]
self.connected = True
def close(self):
self.connected = False
def test_socket_init():
socketio = SocketIO("localhost", 8080)
assert socketio.host == "localhost"
assert socketio.port == 8080
assert socketio.is_open == False
assert socketio.sock.family == socket.AF_INET
assert socketio.sock.type == socket.SOCK_STREAM
def test_socket_put():
dsocket = DummySocket()
socketio = SocketIO("localhost", 8080)
socketio.sock = dsocket
socketio.put(b"message")
assert dsocket.send_buffer == b"message"
def test_open():
dsocket = DummySocket()
socketio = SocketIO("localhost", 8080)
socketio.sock = dsocket
socketio.open()
assert socketio.is_open == True
assert socketio.sock.host == socketio.host
assert socketio.sock.port == socketio.port
def test_socket_open_with_timeout():
dsocket = DummySocket()
socketio = SocketIO("localhost", 8080)
socketio.sock = dsocket
with mock.patch.object(dsocket, "connect") as mock_connect:
socketio.open(timeout=0.1)
mock_connect.assert_called_once()
mock_connect.reset_mock()
# There is a 1s sleep in the retry loop, mock_connect should be called only once
mock_connect.side_effect = Exception("Connection failed")
with pytest.raises(ConnectionError):
socketio.open(timeout=0.4)
mock_connect.assert_called_once()
def test_close():
socketio = SocketIO("localhost", 8080)
socketio.close()
assert socketio.sock == None
assert socketio.is_open == False
def test_socket_signal_get(signal):
"""
Test that the get method of the SocketSignal class correctly retrives values from the socket,
and that it implements the caching and timeout mechanism to avoid excessive socket reads and/or recursions.
"""
# First get should call the socket and cache the value
controller = signal.controller
controller.sock: SocketMock
controller.sock.buffer_recv = [b"value2", b"value1"]
signal._readback_timeout = 0
readback = signal.read()
assert readback[signal.name]["value"] == "value2"
readback2 = signal.read()
assert readback2[signal.name]["value"] == "value1"
assert readback[signal.name]["timestamp"] != readback2[signal.name]["timestamp"]
controller.sock.buffer_recv = [b"value2"]
cb_bucket = []
read_value = None
signal._readback_timeout = 10
def _test_cb(value, old_value, **kwargs):
cb_bucket.append((value, old_value))
read_value = signal.read()
signal.subscribe(_test_cb, event_type=signal.SUB_VALUE, run=False)
signal._readback_timeout = 10
signal._last_readback = 0 # reset the last readback time to force a socket read
readback1 = signal.read()
assert readback1[signal.name]["value"] == "value2"
# The value should be cached, so it should not change
assert cb_bucket == [("value2", "value1")]
readback2 = signal.read()
for entry in ("value", "timestamp"):
assert readback1[signal.name][entry] == readback2[signal.name][entry]
def test_socket_signal_put(signal):
"""
Test that the put method of the SocketSignal class correctly sends values to the socket,
and that it implements the necessary subscription notifications for value changes.
"""
controller = signal.controller
controller.sock: SocketMock
cb_bucket = []
initial_value = signal._readback
def _test_cb(value, old_value, **kwargs):
cb_bucket.append((value, old_value))
signal.subscribe(_test_cb, event_type=signal.SUB_SETPOINT, run=False)
signal.put("new_value")
assert controller.sock.buffer_put == [b"new_value\n"]
assert cb_bucket == [("new_value", initial_value)]
signal.put("another_value")
assert controller.sock.buffer_put == [b"new_value\n", b"another_value\n"]
assert cb_bucket == [("new_value", initial_value), ("another_value", "new_value")]