import copy import inspect from unittest import mock import numpy as np import pytest from ophyd_devices.tests.utils import SocketMock from csaxs_bec.devices.npoint.npoint import NPointAxis, NPointController from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, FlomniGalilMotor from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor from csaxs_bec.devices.omny.galil.galil_rio import ( GalilRIO, GalilRIOAnalogSignalRO, GalilRIOController, ) from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, LamniGalilMotor from csaxs_bec.devices.omny.galil.ogalil_ophyd import OMNYGalilController, OMNYGalilMotor from csaxs_bec.devices.omny.galil.sgalil_ophyd import GalilController, SGalilMotor from csaxs_bec.devices.omny.rt.rt_flomni_ophyd import RtFlomniController, RtFlomniMotor from csaxs_bec.devices.omny.rt.rt_lamni_ophyd import RtLamniController, RtLamniMotor from csaxs_bec.devices.omny.rt.rt_omny_ophyd import RtOMNYController, RtOMNYMotor from csaxs_bec.devices.smaract.smaract_ophyd import SmaractController, SmaractMotor @pytest.fixture(scope="function") def leyey(dm_with_devices): LamniGalilController._reset_controller() leyey_motor = LamniGalilMotor( "H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock, device_manager=dm_with_devices, ) leyey_motor.controller.on() yield leyey_motor leyey_motor.controller.off() leyey_motor.controller._reset_controller() @pytest.fixture(scope="function") def leyex(dm_with_devices): LamniGalilController._reset_controller() leyex_motor = LamniGalilMotor( "A", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock, device_manager=dm_with_devices, ) leyex_motor.controller.on() yield leyex_motor leyex_motor.controller.off() leyex_motor.controller._reset_controller() @pytest.mark.parametrize("pos,msg,sign", [(1, b" -12800\n\r", 1), (-1, b" -12800\n\r", -1)]) def test_axis_get(leyey, pos, msg, sign): leyey.sign = sign leyey.controller.sock.flush_buffer() leyey.controller.sock.buffer_recv = msg val = leyey.read() assert val["leyey"]["value"] == pos assert leyey.readback.get() == pos @pytest.mark.parametrize( "target_pos,socket_put_messages,socket_get_messages", [ ( 0, [ b"MG allaxref\r", b"MG_XQ0\r", b"naxis=7\r", b"ntarget=0.000\r", b"movereq=1\r", b"XQ#NEWPAR\r", b"MG_XQ0\r", ], [b"1.00", b"-1", b":", b":", b":", b":", b"-1"], ) ], ) def test_axis_put(leyey, target_pos, socket_put_messages, socket_get_messages): leyey.controller.sock.flush_buffer() leyey.controller.sock.buffer_recv = copy.deepcopy(socket_get_messages) leyey.user_setpoint.put(target_pos) assert leyey.controller.sock.buffer_put == socket_put_messages @pytest.mark.parametrize( "axis_nr,direction,socket_put_messages,socket_get_messages", [ ( 0, "forward", [ b"naxis=0\r", b"ndir=1\r", b"XQ#NEWPAR\r", b"XQ#FES\r", b"MG_BGA\r", b"MGbcklact[0]\r", b"MG_XQ0\r", b"MG_XQ2\r", b"MG _LRA, _LFA\r", ], [b":", b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.000 0.000"], ), ( 1, "reverse", [ b"naxis=1\r", b"ndir=-1\r", b"XQ#NEWPAR\r", b"XQ#FES\r", b"MG_BGB\r", b"MGbcklact[1]\r", b"MG_XQ0\r", b"MG_XQ2\r", b"MG _LRB, _LFB\r", ], [b":", b":", b":", b":", b"0", b"0", b"-1", b"-1", b"0.000 1.000"], ), ], ) def test_drive_axis_to_limit(leyex, axis_nr, direction, socket_put_messages, socket_get_messages): leyex.controller.sock.flush_buffer() leyex.controller.sock.buffer_recv = copy.deepcopy(socket_get_messages) leyex.controller.drive_axis_to_limit(axis_nr, direction) assert leyex.controller.sock.buffer_put == socket_put_messages @pytest.mark.parametrize( "axis_nr,socket_put_messages,socket_get_messages", [ ( 0, [ b"naxis=0\r", b"XQ#NEWPAR\r", b"XQ#FRM\r", b"MG_BGA\r", b"MGbcklact[0]\r", b"MG_XQ0\r", b"MG_XQ2\r", b"MG axisref[0]\r", ], [b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.00"], ), ( 1, [ b"naxis=1\r", b"XQ#NEWPAR\r", b"XQ#FRM\r", b"MG_BGB\r", b"MGbcklact[1]\r", b"MG_XQ0\r", b"MG_XQ2\r", b"MG axisref[1]\r", ], [b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.00"], ), ], ) def test_find_reference(leyex, axis_nr, socket_put_messages, socket_get_messages): leyex.controller.sock.flush_buffer() leyex.controller.sock.buffer_recv = copy.deepcopy(socket_get_messages) try: leyex.controller.find_reference(axis_nr) except Exception as e: print(e) assert leyex.controller.sock.buffer_put == socket_put_messages def test_wait_for_connection_called(dm_with_devices): """Test that wait_for_connection is called on all motors that have a socket controller.""" dm = dm_with_devices testable_connections = [ (NPointAxis, NPointController), (FlomniGalilMotor, FlomniGalilController), (FuprGalilMotor, FuprGalilController), (LamniGalilMotor, LamniGalilController), (OMNYGalilMotor, OMNYGalilController), (SGalilMotor, GalilController), (RtFlomniMotor, RtFlomniController), (RtLamniMotor, RtLamniController), (RtOMNYMotor, RtOMNYController), (SmaractMotor, SmaractController), (GalilRIO, GalilRIOController), ] for motor_cls, controller_cls in testable_connections: # Store values to restore later ctrl_axis_backup = controller_cls._axes_per_controller try: controller_cls._reset_controller() controller_cls._axes_per_controller = 3 inspect_args = inspect.getfullargspec(motor_cls.__init__).args inspect_kwargs = inspect.getfullargspec(motor_cls.__init__).kwonlyargs if len(inspect_args) > 1: args = ("C",) else: args = () kwargs = { "name": "test_motor", "host": "mpc2680.psi.ch", "port": 8081, "device_manager": dm, "socket_cls": SocketMock, } motor = motor_cls(*args, **kwargs) with mock.patch.object(motor.controller, "on") as mock_on: motor.wait_for_connection(timeout=5.0) assert mock_on.call_args_list[-1] == mock.call(timeout=5.0) # Make sure destroy calls controller off with mock.patch.object(motor.controller, "off") as mock_off: motor.destroy() assert mock_off.call_count == 1 assert mock_off.call_args_list[0] == mock.call(update_config=False) assert motor._destroyed is True finally: controller_cls._reset_controller() controller_cls._axes_per_controller = ctrl_axis_backup ######################## #### Test Galil RIO #### ######################## @pytest.fixture def galil_rio(dm_with_devices): try: rio = GalilRIO( name="galil_rio", host="129.129.0.1", socket_cls=SocketMock, device_manager=dm_with_devices, ) rio.wait_for_connection() yield rio finally: rio.destroy() def test_galil_rio_initialization(galil_rio): """ Test that the Galil RIO signal can establish a connection. """ assert galil_rio.controller.connected is True # All signals should be connected if the controller is connected for walk in galil_rio.walk_signals(): signal = walk.item assert signal.connected is True assert galil_rio.controller._socket_host == "129.129.0.1" assert galil_rio.controller._socket_port == 23 # Default port def test_galil_rio_signal_read(galil_rio): """ Test that the Galil RIO signal can read values correctly. """ ########### ## Test read of all channels ########### assert galil_rio.analog_in.ch0._readback_timeout == 0.1 # Default read timeout of 100ms # Mock the socket to return specific values analog_bufffer = b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901\r\n" galil_rio.controller.sock.buffer_recv = [] # Clear any existing buffer galil_rio.controller.sock.buffer_recv.append(analog_bufffer) read_values = galil_rio.read() assert len(read_values) == 8 # 8 channels expected_values = { galil_rio.analog_in.ch0.name: {"value": 1.234}, galil_rio.analog_in.ch1.name: {"value": 2.345}, galil_rio.analog_in.ch2.name: {"value": 3.456}, galil_rio.analog_in.ch3.name: {"value": 4.567}, galil_rio.analog_in.ch4.name: {"value": 5.678}, galil_rio.analog_in.ch5.name: {"value": 6.789}, galil_rio.analog_in.ch6.name: {"value": 7.890}, galil_rio.analog_in.ch7.name: {"value": 8.901}, } # All timestamps should be the same assert all( ret["timestamp"] == read_values[galil_rio.analog_in.ch0.name]["timestamp"] for signal_name, ret in read_values.items() ) # Check values for signal_name, expected in expected_values.items(): assert np.isclose(read_values[signal_name]["value"], expected["value"]) assert "timestamp" in read_values[signal_name] # Check communication command to socker assert galil_rio.controller.sock.buffer_put == [ b"MG@AN[0], @AN[1], @AN[2], @AN[3], @AN[4], @AN[5], @AN[6], @AN[7]\r" ] ########### ## Test read of single channel with callback ########### # Add callback to update readback value_callback_buffer: list[tuple] = [] def value_callback(value, old_value, **kwargs): obj = kwargs.get("obj") galil = obj.parent.parent readback = galil.read() value_callback_buffer.append(readback) galil_rio.analog_in.ch0.subscribe(value_callback, run=False) galil_rio.controller.sock.buffer_recv = [b" 2.5 2.6 2.7 2.8 2.9 3.0 3.1 3.2"] expected_values = [2.5, 2.6, 2.7, 2.8, 2.9, 3.0, 3.1, 3.2] ################## ## Test cached readback ################## # Should have used the cached value for walk in galil_rio.walk_signals(): walk.item._readback_timeout = 10 # Make sure cached read is used ret = galil_rio.analog_in.ch0.read() # Should not trigger callback since value did not change assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 1.234) # Same timestamp as for another channel as this is cached read assert np.isclose( ret[galil_rio.analog_in.ch0.name]["timestamp"], galil_rio.analog_in.ch7.timestamp ) assert len(value_callback_buffer) == 0 ################## ## Test unchached read from controller ################## # Now force a read from the controller galil_rio.analog_in.ch0._last_readback = 0 # Force read from controller ret = galil_rio.analog_in.ch0.read() assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 2.5) # Check callback invocation, but only 1 callback even with galil_rio.read() call in callback assert len(value_callback_buffer) == 1 values = [value["value"] for value in value_callback_buffer[0].values()] assert np.isclose(values, expected_values).all() assert all( [ value["timestamp"] == value_callback_buffer[0][galil_rio.analog_in.ch0.name]["timestamp"] for value in value_callback_buffer[0].values() ] ) def test_galil_rio_digital_out_signal(galil_rio): """ Test that the Galil RIO digital output signal can be set correctly. """ ## Test Read from digital output channels buffer_receive = [] excepted_put_buffer = [] for ii in range(galil_rio.digital_out.ch0._NUM_DIGITAL_OUTPUT_CHANNELS): cmd = f"MG@OUT[{ii}]\r".encode() excepted_put_buffer.append(cmd) recv = " 1.000".encode() buffer_receive.append(recv) galil_rio.controller.sock.buffer_recv = buffer_receive # Mock response for readback digital_read = galil_rio.read_configuration() # Read to populate readback values for walk in galil_rio.digital_out.walk_signals(): assert np.isclose(digital_read[walk.item.name]["value"], 1.0) assert galil_rio.controller.sock.buffer_put == excepted_put_buffer # Test writing to digital output channels galil_rio.controller.sock.buffer_put = [] # Clear buffer put galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback # Set digital output channel 0 to high galil_rio.digital_out.ch0.put(1) assert galil_rio.controller.sock.buffer_put == [b"SB0\r"] # Set digital output channel 0 to low galil_rio.controller.sock.buffer_put = [] # Clear buffer put galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback galil_rio.digital_out.ch0.put(0) assert galil_rio.controller.sock.buffer_put == [b"CB0\r"]