From e499ccf2e703051245b70649c5aa703cf274efdd Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 22 Jan 2026 16:10:37 +0100 Subject: [PATCH] wip test --- csaxs_bec/devices/omny/galil/galil_rio.py | 5 +- tests/tests_devices/test_galil.py | 141 ++++++++++++---------- 2 files changed, 77 insertions(+), 69 deletions(-) diff --git a/csaxs_bec/devices/omny/galil/galil_rio.py b/csaxs_bec/devices/omny/galil/galil_rio.py index c8ce230..ac1302d 100644 --- a/csaxs_bec/devices/omny/galil/galil_rio.py +++ b/csaxs_bec/devices/omny/galil/galil_rio.py @@ -154,9 +154,8 @@ class GalilRIO(PSIDeviceBase): # This yields tuples of cpt, signal for walk in self.walk_signals(): if isinstance(walk.item, GalilRIOSignalRO): - channels.append( - (walk.item._channel, walk.item.name) - ) # pylint: disable=protected-access + # pylint: disable=protected-access + channels.append((walk.item._channel, walk.item.name)) # Read all channels in one command cmd = "MG@" + ",@".join([f"AN[{ii}]" for ii, _ in channels]) diff --git a/tests/tests_devices/test_galil.py b/tests/tests_devices/test_galil.py index 68520e0..ea9dc62 100644 --- a/tests/tests_devices/test_galil.py +++ b/tests/tests_devices/test_galil.py @@ -8,6 +8,7 @@ from ophyd_devices.tests.utils import SocketMock from csaxs_bec.devices.npoint.npoint import NPointAxis, NPointController from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, FlomniGalilMotor from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor +from csaxs_bec.devices.omny.galil.galil_rio import GalilRIO, GalilRIOController, GalilRIOSignalRO from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, LamniGalilMotor from csaxs_bec.devices.omny.galil.ogalil_ophyd import OMNYGalilController, OMNYGalilMotor from csaxs_bec.devices.omny.galil.sgalil_ophyd import GalilController, SGalilMotor @@ -187,6 +188,7 @@ def test_wait_for_connection_called(dm_with_devices): (RtLamniMotor, RtLamniController), (RtOMNYMotor, RtOMNYController), (SmaractMotor, SmaractController), + (GalilRIO, GalilRIOController), ] for motor_cls, controller_cls in testable_connections: # Store values to restore later @@ -217,79 +219,86 @@ def test_wait_for_connection_called(dm_with_devices): ######################## -# def test_galil_rio_signal_initialization(dm_with_devices): -# """ -# Test that the Galil RIO signal can establish a connection. -# """ -# try: -# signal = GalilRIOSignal( -# name="galil_rio_signal", socket_cls=SocketMock, device_manager=dm_with_devices -# ) -# # Test initial parameters settings -# assert signal.connected is False -# assert signal.controller._socket_host == "129.129.98.64" -# assert signal.controller._socket_port == 23 -# assert signal._NUM_ANALOG_CH == 8 -# assert np.isclose(signal._readback, [0.0] * 8).all() - -# # Test connection establishment -# with mock.patch.object(signal.controller, "on") as mock_on: -# signal.wait_for_connection(timeout=5.0) -# mock_on.assert_called_once_with(timeout=5.0) -# assert signal.connected is True -# finally: -# signal.destroy() - -# assert signal.connected is False +@pytest.fixture +def galil_rio(dm_with_devices): + try: + rio = GalilRIO( + name="galil_rio", + host="129.129.0.1", + socket_cls=SocketMock, + device_manager=dm_with_devices, + ) + rio.wait_for_connection() + yield rio + finally: + rio.destroy() -# @pytest.fixture -# def galil_rio_signal(dm_with_devices): -# try: -# rio_signal = GalilRIOSignal( -# name="galil_rio_signal", socket_cls=SocketMock, device_manager=dm_with_devices -# ) -# rio_signal.wait_for_connection() -# yield rio_signal -# finally: -# rio_signal.destroy() +def test_galil_rio_initialization(galil_rio): + """ + Test that the Galil RIO signal can establish a connection. + """ + assert galil_rio.controller.connected is True + # All signals should be connected if the controller is connected + for walk in galil_rio.walk_signals(): + signal = walk.item + assert signal.connected is True + + assert galil_rio.controller._socket_host == "129.129.0.1" + assert galil_rio.controller._socket_port == 23 # Default port -# def test_galil_rio_signal_read(galil_rio_signal): -# """ -# Test that the Galil RIO signal can read values correctly. -# """ -# # Add callback to update readback -# value_callback_buffer: list[tuple] = [] +def test_galil_rio_signal_read(galil_rio): + """ + Test that the Galil RIO signal can read values correctly. + """ -# def value_callback(value, old_value, **kwargs): -# value_callback_buffer.append((value, old_value)) + # Mock the socket to return specific values + galil_rio.controller.sock.buffer_recv = [b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901"] -# galil_rio_signal.subscribe(value_callback) + read_values = galil_rio.read() + assert len(read_values) == 8 # 8 channels + expected_values = { + galil_rio.an_ch0.name: {"value": 1.234}, + galil_rio.an_ch1.name: {"value": 2.345}, + galil_rio.an_ch2.name: {"value": 3.456}, + galil_rio.an_ch3.name: {"value": 4.567}, + galil_rio.an_ch4.name: {"value": 5.678}, + galil_rio.an_ch5.name: {"value": 6.789}, + galil_rio.an_ch6.name: {"value": 7.890}, + galil_rio.an_ch7.name: {"value": 8.901}, + } + for signal_name, expected in expected_values.items(): + assert np.isclose(read_values[signal_name]["value"], expected["value"]) + assert "timestamp" in read_values[signal_name] -# # Mock the socket to return specific values -# galil_rio_signal.controller.sock.buffer_recv = [ -# b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901" -# ] + # Check communication command to socker + assert galil_rio.controller.sock.buffer_put == [ + b"MG@AN[0],@AN[1],@AN[2],@AN[3],@AN[4],@AN[5],@AN[6],@AN[7]\r" + ] -# read_values = galil_rio_signal.read()[galil_rio_signal.name]["value"] -# expected_values = [1.234, 2.345, 3.456, 4.567, 5.678, 6.789, 7.890, 8.901] + # Add callback to update readback + value_callback_buffer: list[tuple] = [] -# # Check values -# assert np.isclose(read_values, expected_values).all() -# # Check communication command to socker -# assert galil_rio_signal.controller.sock.buffer_put == [ -# b"MG@AN[0],@AN[1],@AN[2],@AN[3],@AN[4],@AN[5],@AN[6],@AN[7]\r" -# ] -# # Check callback invocation -# assert len(value_callback_buffer) == 1 -# # Check callback values match expected -# assert np.isclose(value_callback_buffer[0][0], expected_values).all() -# assert np.isclose(value_callback_buffer[0][1], [0.0] * 8).all() + def value_callback(value, old_value, **kwargs): + value_callback_buffer.append((value, old_value)) -# # Check that get works, requires to reset the mock receive buffer -# galil_rio_signal.controller.sock.buffer_recv = [ -# b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901" -# ] -# get_values = galil_rio_signal.get() -# assert np.isclose(get_values, expected_values).all() + galil_rio.an_ch0.subscribe(value_callback) + galil_rio.controller.sock.buffer_recv = [b" 1.234"] + + val = galil_rio.an_ch0.read() + + assert np.isclose(val[galil_rio.an_ch0.name]["value"], 1.234) + + # Check callback invocation + assert len(value_callback_buffer) == 1 + # Check callback values match expected + assert np.isclose(value_callback_buffer[0], (1.234, 0)).all() + + # Test another read with different value + value_callback_buffer.clear() + galil_rio.controller.sock.buffer_recv = [b" 2.5"] + + val = galil_rio.an_ch0.read() + + assert np.isclose(value_callback_buffer[0], (2.5, 1.234)).all()