wip test
Some checks failed
CI for csaxs_bec / test (pull_request) Failing after 1m13s
CI for csaxs_bec / test (push) Failing after 1m17s

This commit is contained in:
2026-01-22 16:10:37 +01:00
parent e8b0c65f09
commit e499ccf2e7
2 changed files with 77 additions and 69 deletions

View File

@@ -154,9 +154,8 @@ class GalilRIO(PSIDeviceBase):
# This yields tuples of cpt, signal
for walk in self.walk_signals():
if isinstance(walk.item, GalilRIOSignalRO):
channels.append(
(walk.item._channel, walk.item.name)
) # pylint: disable=protected-access
# pylint: disable=protected-access
channels.append((walk.item._channel, walk.item.name))
# Read all channels in one command
cmd = "MG@" + ",@".join([f"AN[{ii}]" for ii, _ in channels])

View File

@@ -8,6 +8,7 @@ from ophyd_devices.tests.utils import SocketMock
from csaxs_bec.devices.npoint.npoint import NPointAxis, NPointController
from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, FlomniGalilMotor
from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor
from csaxs_bec.devices.omny.galil.galil_rio import GalilRIO, GalilRIOController, GalilRIOSignalRO
from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, LamniGalilMotor
from csaxs_bec.devices.omny.galil.ogalil_ophyd import OMNYGalilController, OMNYGalilMotor
from csaxs_bec.devices.omny.galil.sgalil_ophyd import GalilController, SGalilMotor
@@ -187,6 +188,7 @@ def test_wait_for_connection_called(dm_with_devices):
(RtLamniMotor, RtLamniController),
(RtOMNYMotor, RtOMNYController),
(SmaractMotor, SmaractController),
(GalilRIO, GalilRIOController),
]
for motor_cls, controller_cls in testable_connections:
# Store values to restore later
@@ -217,79 +219,86 @@ def test_wait_for_connection_called(dm_with_devices):
########################
# def test_galil_rio_signal_initialization(dm_with_devices):
# """
# Test that the Galil RIO signal can establish a connection.
# """
# try:
# signal = GalilRIOSignal(
# name="galil_rio_signal", socket_cls=SocketMock, device_manager=dm_with_devices
# )
# # Test initial parameters settings
# assert signal.connected is False
# assert signal.controller._socket_host == "129.129.98.64"
# assert signal.controller._socket_port == 23
# assert signal._NUM_ANALOG_CH == 8
# assert np.isclose(signal._readback, [0.0] * 8).all()
# # Test connection establishment
# with mock.patch.object(signal.controller, "on") as mock_on:
# signal.wait_for_connection(timeout=5.0)
# mock_on.assert_called_once_with(timeout=5.0)
# assert signal.connected is True
# finally:
# signal.destroy()
# assert signal.connected is False
@pytest.fixture
def galil_rio(dm_with_devices):
try:
rio = GalilRIO(
name="galil_rio",
host="129.129.0.1",
socket_cls=SocketMock,
device_manager=dm_with_devices,
)
rio.wait_for_connection()
yield rio
finally:
rio.destroy()
# @pytest.fixture
# def galil_rio_signal(dm_with_devices):
# try:
# rio_signal = GalilRIOSignal(
# name="galil_rio_signal", socket_cls=SocketMock, device_manager=dm_with_devices
# )
# rio_signal.wait_for_connection()
# yield rio_signal
# finally:
# rio_signal.destroy()
def test_galil_rio_initialization(galil_rio):
"""
Test that the Galil RIO signal can establish a connection.
"""
assert galil_rio.controller.connected is True
# All signals should be connected if the controller is connected
for walk in galil_rio.walk_signals():
signal = walk.item
assert signal.connected is True
assert galil_rio.controller._socket_host == "129.129.0.1"
assert galil_rio.controller._socket_port == 23 # Default port
# def test_galil_rio_signal_read(galil_rio_signal):
# """
# Test that the Galil RIO signal can read values correctly.
# """
# # Add callback to update readback
# value_callback_buffer: list[tuple] = []
def test_galil_rio_signal_read(galil_rio):
"""
Test that the Galil RIO signal can read values correctly.
"""
# def value_callback(value, old_value, **kwargs):
# value_callback_buffer.append((value, old_value))
# Mock the socket to return specific values
galil_rio.controller.sock.buffer_recv = [b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901"]
# galil_rio_signal.subscribe(value_callback)
read_values = galil_rio.read()
assert len(read_values) == 8 # 8 channels
expected_values = {
galil_rio.an_ch0.name: {"value": 1.234},
galil_rio.an_ch1.name: {"value": 2.345},
galil_rio.an_ch2.name: {"value": 3.456},
galil_rio.an_ch3.name: {"value": 4.567},
galil_rio.an_ch4.name: {"value": 5.678},
galil_rio.an_ch5.name: {"value": 6.789},
galil_rio.an_ch6.name: {"value": 7.890},
galil_rio.an_ch7.name: {"value": 8.901},
}
for signal_name, expected in expected_values.items():
assert np.isclose(read_values[signal_name]["value"], expected["value"])
assert "timestamp" in read_values[signal_name]
# # Mock the socket to return specific values
# galil_rio_signal.controller.sock.buffer_recv = [
# b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901"
# ]
# Check communication command to socker
assert galil_rio.controller.sock.buffer_put == [
b"MG@AN[0],@AN[1],@AN[2],@AN[3],@AN[4],@AN[5],@AN[6],@AN[7]\r"
]
# read_values = galil_rio_signal.read()[galil_rio_signal.name]["value"]
# expected_values = [1.234, 2.345, 3.456, 4.567, 5.678, 6.789, 7.890, 8.901]
# Add callback to update readback
value_callback_buffer: list[tuple] = []
# # Check values
# assert np.isclose(read_values, expected_values).all()
# # Check communication command to socker
# assert galil_rio_signal.controller.sock.buffer_put == [
# b"MG@AN[0],@AN[1],@AN[2],@AN[3],@AN[4],@AN[5],@AN[6],@AN[7]\r"
# ]
# # Check callback invocation
# assert len(value_callback_buffer) == 1
# # Check callback values match expected
# assert np.isclose(value_callback_buffer[0][0], expected_values).all()
# assert np.isclose(value_callback_buffer[0][1], [0.0] * 8).all()
def value_callback(value, old_value, **kwargs):
value_callback_buffer.append((value, old_value))
# # Check that get works, requires to reset the mock receive buffer
# galil_rio_signal.controller.sock.buffer_recv = [
# b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901"
# ]
# get_values = galil_rio_signal.get()
# assert np.isclose(get_values, expected_values).all()
galil_rio.an_ch0.subscribe(value_callback)
galil_rio.controller.sock.buffer_recv = [b" 1.234"]
val = galil_rio.an_ch0.read()
assert np.isclose(val[galil_rio.an_ch0.name]["value"], 1.234)
# Check callback invocation
assert len(value_callback_buffer) == 1
# Check callback values match expected
assert np.isclose(value_callback_buffer[0], (1.234, 0)).all()
# Test another read with different value
value_callback_buffer.clear()
galil_rio.controller.sock.buffer_recv = [b" 2.5"]
val = galil_rio.an_ch0.read()
assert np.isclose(value_callback_buffer[0], (2.5, 1.234)).all()