Files
csaxs_bec/tests/tests_devices/test_galil.py
appel_c 34be4c69e5
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m14s
CI for csaxs_bec / test (push) Successful in 1m18s
test(galil-rio): Add tests for GalilRIOSignal
2026-01-16 11:13:08 +01:00

297 lines
9.7 KiB
Python

import copy
from unittest import mock
import numpy as np
import pytest
from ophyd_devices.tests.utils import SocketMock
from csaxs_bec.devices.npoint.npoint import NPointAxis, NPointController
from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, FlomniGalilMotor
from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor
from csaxs_bec.devices.omny.galil.galil_rio import GalilRIOSignal
from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, LamniGalilMotor
from csaxs_bec.devices.omny.galil.ogalil_ophyd import OMNYGalilController, OMNYGalilMotor
from csaxs_bec.devices.omny.galil.sgalil_ophyd import GalilController, SGalilMotor
from csaxs_bec.devices.omny.rt.rt_flomni_ophyd import RtFlomniController, RtFlomniMotor
from csaxs_bec.devices.omny.rt.rt_lamni_ophyd import RtLamniController, RtLamniMotor
from csaxs_bec.devices.omny.rt.rt_omny_ophyd import RtOMNYController, RtOMNYMotor
from csaxs_bec.devices.smaract.smaract_ophyd import SmaractController, SmaractMotor
@pytest.fixture(scope="function")
def leyey(dm_with_devices):
LamniGalilController._reset_controller()
leyey_motor = LamniGalilMotor(
"H",
name="leyey",
host="mpc2680.psi.ch",
port=8081,
socket_cls=SocketMock,
device_manager=dm_with_devices,
)
leyey_motor.controller.on()
yield leyey_motor
leyey_motor.controller.off()
leyey_motor.controller._reset_controller()
@pytest.fixture(scope="function")
def leyex(dm_with_devices):
LamniGalilController._reset_controller()
leyex_motor = LamniGalilMotor(
"A",
name="leyey",
host="mpc2680.psi.ch",
port=8081,
socket_cls=SocketMock,
device_manager=dm_with_devices,
)
leyex_motor.controller.on()
yield leyex_motor
leyex_motor.controller.off()
leyex_motor.controller._reset_controller()
@pytest.mark.parametrize("pos,msg,sign", [(1, b" -12800\n\r", 1), (-1, b" -12800\n\r", -1)])
def test_axis_get(leyey, pos, msg, sign):
leyey.sign = sign
leyey.controller.sock.flush_buffer()
leyey.controller.sock.buffer_recv = msg
val = leyey.read()
assert val["leyey"]["value"] == pos
assert leyey.readback.get() == pos
@pytest.mark.parametrize(
"target_pos,socket_put_messages,socket_get_messages",
[
(
0,
[
b"MG allaxref\r",
b"MG_XQ0\r",
b"naxis=7\r",
b"ntarget=0.000\r",
b"movereq=1\r",
b"XQ#NEWPAR\r",
b"MG_XQ0\r",
],
[b"1.00", b"-1", b":", b":", b":", b":", b"-1"],
)
],
)
def test_axis_put(leyey, target_pos, socket_put_messages, socket_get_messages):
leyey.controller.sock.flush_buffer()
leyey.controller.sock.buffer_recv = copy.deepcopy(socket_get_messages)
leyey.user_setpoint.put(target_pos)
assert leyey.controller.sock.buffer_put == socket_put_messages
@pytest.mark.parametrize(
"axis_nr,direction,socket_put_messages,socket_get_messages",
[
(
0,
"forward",
[
b"naxis=0\r",
b"ndir=1\r",
b"XQ#NEWPAR\r",
b"XQ#FES\r",
b"MG_BGA\r",
b"MGbcklact[0]\r",
b"MG_XQ0\r",
b"MG_XQ2\r",
b"MG _LRA, _LFA\r",
],
[b":", b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.000 0.000"],
),
(
1,
"reverse",
[
b"naxis=1\r",
b"ndir=-1\r",
b"XQ#NEWPAR\r",
b"XQ#FES\r",
b"MG_BGB\r",
b"MGbcklact[1]\r",
b"MG_XQ0\r",
b"MG_XQ2\r",
b"MG _LRB, _LFB\r",
],
[b":", b":", b":", b":", b"0", b"0", b"-1", b"-1", b"0.000 1.000"],
),
],
)
def test_drive_axis_to_limit(leyex, axis_nr, direction, socket_put_messages, socket_get_messages):
leyex.controller.sock.flush_buffer()
leyex.controller.sock.buffer_recv = copy.deepcopy(socket_get_messages)
leyex.controller.drive_axis_to_limit(axis_nr, direction)
assert leyex.controller.sock.buffer_put == socket_put_messages
@pytest.mark.parametrize(
"axis_nr,socket_put_messages,socket_get_messages",
[
(
0,
[
b"naxis=0\r",
b"XQ#NEWPAR\r",
b"XQ#FRM\r",
b"MG_BGA\r",
b"MGbcklact[0]\r",
b"MG_XQ0\r",
b"MG_XQ2\r",
b"MG axisref[0]\r",
],
[b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.00"],
),
(
1,
[
b"naxis=1\r",
b"XQ#NEWPAR\r",
b"XQ#FRM\r",
b"MG_BGB\r",
b"MGbcklact[1]\r",
b"MG_XQ0\r",
b"MG_XQ2\r",
b"MG axisref[1]\r",
],
[b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.00"],
),
],
)
def test_find_reference(leyex, axis_nr, socket_put_messages, socket_get_messages):
leyex.controller.sock.flush_buffer()
leyex.controller.sock.buffer_recv = copy.deepcopy(socket_get_messages)
try:
leyex.controller.find_reference(axis_nr)
except Exception as e:
print(e)
assert leyex.controller.sock.buffer_put == socket_put_messages
def test_wait_for_connection_called(dm_with_devices):
"""Test that wait_for_connection is called on all motors that have a socket controller."""
dm = dm_with_devices
testable_connections = [
(NPointAxis, NPointController),
(FlomniGalilMotor, FlomniGalilController),
(FuprGalilMotor, FuprGalilController),
(LamniGalilMotor, LamniGalilController),
(OMNYGalilMotor, OMNYGalilController),
(SGalilMotor, GalilController),
(RtFlomniMotor, RtFlomniController),
(RtLamniMotor, RtLamniController),
(RtOMNYMotor, RtOMNYController),
(SmaractMotor, SmaractController),
]
for motor_cls, controller_cls in testable_connections:
# Store values to restore later
ctrl_axis_backup = controller_cls._axes_per_controller
try:
controller_cls._reset_controller()
controller_cls._axes_per_controller = 3
motor = motor_cls(
"C",
name="test_motor",
host="mpc2680.psi.ch",
port=8081,
socket_cls=SocketMock,
device_manager=dm,
)
with mock.patch.object(motor.controller, "on") as mock_on:
motor.wait_for_connection(timeout=5.0)
assert mock_on.call_args_list[-1] == mock.call(timeout=5.0)
finally:
controller_cls._reset_controller()
controller_cls._axes_per_controller = ctrl_axis_backup
########################
#### Test Galil RIO ####
########################
def test_galil_rio_signal_initialization(dm_with_devices):
"""
Test that the Galil RIO signal can establish a connection.
"""
try:
signal = GalilRIOSignal(
name="galil_rio_signal", socket_cls=SocketMock, device_manager=dm_with_devices
)
# Test initial parameters settings
assert signal.connected is False
assert signal.controller._socket_host == "129.129.98.64"
assert signal.controller._socket_port == 23
assert signal._NUM_ANALOG_CH == 8
assert np.isclose(signal._readback, [0.0] * 8).all()
# Test connection establishment
with mock.patch.object(signal.controller, "on") as mock_on:
signal.wait_for_connection(timeout=5.0)
mock_on.assert_called_once_with(timeout=5.0)
assert signal.connected is True
finally:
signal.destroy()
assert signal.connected is False
@pytest.fixture
def galil_rio_signal(dm_with_devices):
try:
rio_signal = GalilRIOSignal(
name="galil_rio_signal", socket_cls=SocketMock, device_manager=dm_with_devices
)
rio_signal.wait_for_connection()
yield rio_signal
finally:
rio_signal.destroy()
def test_galil_rio_signal_read(galil_rio_signal):
"""
Test that the Galil RIO signal can read values correctly.
"""
# Add callback to update readback
value_callback_buffer: list[tuple] = []
def value_callback(value, old_value, **kwargs):
value_callback_buffer.append((value, old_value))
galil_rio_signal.subscribe(value_callback)
# Mock the socket to return specific values
galil_rio_signal.controller.sock.buffer_recv = [
b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901"
]
read_values = galil_rio_signal.read()[galil_rio_signal.name]["value"]
expected_values = [1.234, 2.345, 3.456, 4.567, 5.678, 6.789, 7.890, 8.901]
# Check values
assert np.isclose(read_values, expected_values).all()
# Check communication command to socker
assert galil_rio_signal.controller.sock.buffer_put == [
b"MG@AN[0],@AN[1],@AN[2],@AN[3],@AN[4],@AN[5],@AN[6],@AN[7]\r"
]
# Check callback invocation
assert len(value_callback_buffer) == 1
# Check callback values match expected
assert np.isclose(value_callback_buffer[0][0], expected_values).all()
assert np.isclose(value_callback_buffer[0][1], [0.0] * 8).all()
# Check that get works, requires to reset the mock receive buffer
galil_rio_signal.controller.sock.buffer_recv = [
b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901"
]
get_values = galil_rio_signal.get()
assert np.isclose(get_values, expected_values).all()