404 lines
14 KiB
Python
404 lines
14 KiB
Python
import copy
|
|
import inspect
|
|
from unittest import mock
|
|
|
|
import numpy as np
|
|
import pytest
|
|
from ophyd_devices.tests.utils import SocketMock
|
|
|
|
from csaxs_bec.devices.npoint.npoint import NPointAxis, NPointController
|
|
from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, FlomniGalilMotor
|
|
from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor
|
|
from csaxs_bec.devices.omny.galil.galil_rio import (
|
|
GalilRIO,
|
|
GalilRIOAnalogSignalRO,
|
|
GalilRIOController,
|
|
)
|
|
from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, LamniGalilMotor
|
|
from csaxs_bec.devices.omny.galil.ogalil_ophyd import OMNYGalilController, OMNYGalilMotor
|
|
from csaxs_bec.devices.omny.galil.sgalil_ophyd import GalilController, SGalilMotor
|
|
from csaxs_bec.devices.omny.rt.rt_flomni_ophyd import RtFlomniController, RtFlomniMotor
|
|
from csaxs_bec.devices.omny.rt.rt_lamni_ophyd import RtLamniController, RtLamniMotor
|
|
from csaxs_bec.devices.omny.rt.rt_omny_ophyd import RtOMNYController, RtOMNYMotor
|
|
from csaxs_bec.devices.smaract.smaract_ophyd import SmaractController, SmaractMotor
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def leyey(dm_with_devices):
|
|
LamniGalilController._reset_controller()
|
|
leyey_motor = LamniGalilMotor(
|
|
"H",
|
|
name="leyey",
|
|
host="mpc2680.psi.ch",
|
|
port=8081,
|
|
socket_cls=SocketMock,
|
|
device_manager=dm_with_devices,
|
|
)
|
|
leyey_motor.controller.on()
|
|
yield leyey_motor
|
|
leyey_motor.controller.off()
|
|
leyey_motor.controller._reset_controller()
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def leyex(dm_with_devices):
|
|
LamniGalilController._reset_controller()
|
|
leyex_motor = LamniGalilMotor(
|
|
"A",
|
|
name="leyey",
|
|
host="mpc2680.psi.ch",
|
|
port=8081,
|
|
socket_cls=SocketMock,
|
|
device_manager=dm_with_devices,
|
|
)
|
|
leyex_motor.controller.on()
|
|
yield leyex_motor
|
|
leyex_motor.controller.off()
|
|
leyex_motor.controller._reset_controller()
|
|
|
|
|
|
@pytest.mark.parametrize("pos,msg,sign", [(1, b" -12800\n\r", 1), (-1, b" -12800\n\r", -1)])
|
|
def test_axis_get(leyey, pos, msg, sign):
|
|
leyey.sign = sign
|
|
leyey.controller.sock.flush_buffer()
|
|
leyey.controller.sock.buffer_recv = msg
|
|
val = leyey.read()
|
|
assert val["leyey"]["value"] == pos
|
|
assert leyey.readback.get() == pos
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"target_pos,socket_put_messages,socket_get_messages",
|
|
[
|
|
(
|
|
0,
|
|
[
|
|
b"MG allaxref\r",
|
|
b"MG_XQ0\r",
|
|
b"naxis=7\r",
|
|
b"ntarget=0.000\r",
|
|
b"movereq=1\r",
|
|
b"XQ#NEWPAR\r",
|
|
b"MG_XQ0\r",
|
|
],
|
|
[b"1.00", b"-1", b":", b":", b":", b":", b"-1"],
|
|
)
|
|
],
|
|
)
|
|
def test_axis_put(leyey, target_pos, socket_put_messages, socket_get_messages):
|
|
leyey.controller.sock.flush_buffer()
|
|
leyey.controller.sock.buffer_recv = copy.deepcopy(socket_get_messages)
|
|
leyey.user_setpoint.put(target_pos)
|
|
assert leyey.controller.sock.buffer_put == socket_put_messages
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"axis_nr,direction,socket_put_messages,socket_get_messages",
|
|
[
|
|
(
|
|
0,
|
|
"forward",
|
|
[
|
|
b"naxis=0\r",
|
|
b"ndir=1\r",
|
|
b"XQ#NEWPAR\r",
|
|
b"XQ#FES\r",
|
|
b"MG_BGA\r",
|
|
b"MGbcklact[0]\r",
|
|
b"MG_XQ0\r",
|
|
b"MG_XQ2\r",
|
|
b"MG _LRA, _LFA\r",
|
|
],
|
|
[b":", b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.000 0.000"],
|
|
),
|
|
(
|
|
1,
|
|
"reverse",
|
|
[
|
|
b"naxis=1\r",
|
|
b"ndir=-1\r",
|
|
b"XQ#NEWPAR\r",
|
|
b"XQ#FES\r",
|
|
b"MG_BGB\r",
|
|
b"MGbcklact[1]\r",
|
|
b"MG_XQ0\r",
|
|
b"MG_XQ2\r",
|
|
b"MG _LRB, _LFB\r",
|
|
],
|
|
[b":", b":", b":", b":", b"0", b"0", b"-1", b"-1", b"0.000 1.000"],
|
|
),
|
|
],
|
|
)
|
|
def test_drive_axis_to_limit(leyex, axis_nr, direction, socket_put_messages, socket_get_messages):
|
|
leyex.controller.sock.flush_buffer()
|
|
leyex.controller.sock.buffer_recv = copy.deepcopy(socket_get_messages)
|
|
leyex.controller.drive_axis_to_limit(axis_nr, direction)
|
|
assert leyex.controller.sock.buffer_put == socket_put_messages
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"axis_nr,socket_put_messages,socket_get_messages",
|
|
[
|
|
(
|
|
0,
|
|
[
|
|
b"naxis=0\r",
|
|
b"XQ#NEWPAR\r",
|
|
b"XQ#FRM\r",
|
|
b"MG_BGA\r",
|
|
b"MGbcklact[0]\r",
|
|
b"MG_XQ0\r",
|
|
b"MG_XQ2\r",
|
|
b"MG axisref[0]\r",
|
|
],
|
|
[b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.00"],
|
|
),
|
|
(
|
|
1,
|
|
[
|
|
b"naxis=1\r",
|
|
b"XQ#NEWPAR\r",
|
|
b"XQ#FRM\r",
|
|
b"MG_BGB\r",
|
|
b"MGbcklact[1]\r",
|
|
b"MG_XQ0\r",
|
|
b"MG_XQ2\r",
|
|
b"MG axisref[1]\r",
|
|
],
|
|
[b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.00"],
|
|
),
|
|
],
|
|
)
|
|
def test_find_reference(leyex, axis_nr, socket_put_messages, socket_get_messages):
|
|
leyex.controller.sock.flush_buffer()
|
|
leyex.controller.sock.buffer_recv = copy.deepcopy(socket_get_messages)
|
|
try:
|
|
leyex.controller.find_reference(axis_nr)
|
|
except Exception as e:
|
|
print(e)
|
|
assert leyex.controller.sock.buffer_put == socket_put_messages
|
|
|
|
|
|
def test_wait_for_connection_called(dm_with_devices):
|
|
"""Test that wait_for_connection is called on all motors that have a socket controller."""
|
|
dm = dm_with_devices
|
|
testable_connections = [
|
|
(NPointAxis, NPointController),
|
|
(FlomniGalilMotor, FlomniGalilController),
|
|
(FuprGalilMotor, FuprGalilController),
|
|
(LamniGalilMotor, LamniGalilController),
|
|
(OMNYGalilMotor, OMNYGalilController),
|
|
(SGalilMotor, GalilController),
|
|
(RtFlomniMotor, RtFlomniController),
|
|
(RtLamniMotor, RtLamniController),
|
|
(RtOMNYMotor, RtOMNYController),
|
|
(SmaractMotor, SmaractController),
|
|
(GalilRIO, GalilRIOController),
|
|
]
|
|
for motor_cls, controller_cls in testable_connections:
|
|
# Store values to restore later
|
|
ctrl_axis_backup = controller_cls._axes_per_controller
|
|
try:
|
|
controller_cls._reset_controller()
|
|
controller_cls._axes_per_controller = 3
|
|
|
|
inspect_args = inspect.getfullargspec(motor_cls.__init__).args
|
|
inspect_kwargs = inspect.getfullargspec(motor_cls.__init__).kwonlyargs
|
|
if len(inspect_args) > 1:
|
|
args = ("C",)
|
|
else:
|
|
args = ()
|
|
kwargs = {
|
|
"name": "test_motor",
|
|
"host": "mpc2680.psi.ch",
|
|
"port": 8081,
|
|
"device_manager": dm,
|
|
"socket_cls": SocketMock,
|
|
}
|
|
motor = motor_cls(*args, **kwargs)
|
|
with mock.patch.object(motor.controller, "on") as mock_on:
|
|
|
|
motor.wait_for_connection(timeout=5.0)
|
|
assert mock_on.call_args_list[-1] == mock.call(timeout=5.0)
|
|
|
|
# Make sure destroy calls controller off
|
|
|
|
with mock.patch.object(motor.controller, "off") as mock_off:
|
|
motor.destroy()
|
|
assert mock_off.call_count == 1
|
|
assert mock_off.call_args_list[0] == mock.call(update_config=False)
|
|
assert motor._destroyed is True
|
|
|
|
finally:
|
|
controller_cls._reset_controller()
|
|
controller_cls._axes_per_controller = ctrl_axis_backup
|
|
|
|
|
|
########################
|
|
#### Test Galil RIO ####
|
|
########################
|
|
|
|
|
|
@pytest.fixture
|
|
def galil_rio(dm_with_devices):
|
|
try:
|
|
rio = GalilRIO(
|
|
name="galil_rio",
|
|
host="129.129.0.1",
|
|
socket_cls=SocketMock,
|
|
device_manager=dm_with_devices,
|
|
)
|
|
rio.wait_for_connection()
|
|
yield rio
|
|
finally:
|
|
rio.destroy()
|
|
|
|
|
|
def test_galil_rio_initialization(galil_rio):
|
|
"""
|
|
Test that the Galil RIO signal can establish a connection.
|
|
"""
|
|
assert galil_rio.controller.connected is True
|
|
# All signals should be connected if the controller is connected
|
|
for walk in galil_rio.walk_signals():
|
|
signal = walk.item
|
|
assert signal.connected is True
|
|
|
|
assert galil_rio.controller._socket_host == "129.129.0.1"
|
|
assert galil_rio.controller._socket_port == 23 # Default port
|
|
|
|
|
|
def test_galil_rio_signal_read(galil_rio):
|
|
"""
|
|
Test that the Galil RIO signal can read values correctly.
|
|
"""
|
|
###########
|
|
## Test read of all channels
|
|
###########
|
|
|
|
assert galil_rio.analog_in.ch0._readback_timeout == 0.1 # Default read timeout of 100ms
|
|
# Mock the socket to return specific values
|
|
analog_bufffer = b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901\r\n"
|
|
galil_rio.controller.sock.buffer_recv = [] # Clear any existing buffer
|
|
galil_rio.controller.sock.buffer_recv.append(analog_bufffer)
|
|
read_values = galil_rio.read()
|
|
assert len(read_values) == 8 # 8 channels
|
|
|
|
expected_values = {
|
|
galil_rio.analog_in.ch0.name: {"value": 1.234},
|
|
galil_rio.analog_in.ch1.name: {"value": 2.345},
|
|
galil_rio.analog_in.ch2.name: {"value": 3.456},
|
|
galil_rio.analog_in.ch3.name: {"value": 4.567},
|
|
galil_rio.analog_in.ch4.name: {"value": 5.678},
|
|
galil_rio.analog_in.ch5.name: {"value": 6.789},
|
|
galil_rio.analog_in.ch6.name: {"value": 7.890},
|
|
galil_rio.analog_in.ch7.name: {"value": 8.901},
|
|
}
|
|
# All timestamps should be the same
|
|
assert all(
|
|
ret["timestamp"] == read_values[galil_rio.analog_in.ch0.name]["timestamp"]
|
|
for signal_name, ret in read_values.items()
|
|
)
|
|
# Check values
|
|
for signal_name, expected in expected_values.items():
|
|
assert np.isclose(read_values[signal_name]["value"], expected["value"])
|
|
assert "timestamp" in read_values[signal_name]
|
|
|
|
# Check communication command to socker
|
|
assert galil_rio.controller.sock.buffer_put == [
|
|
b"MG@AN[0], @AN[1], @AN[2], @AN[3], @AN[4], @AN[5], @AN[6], @AN[7]\r"
|
|
]
|
|
|
|
###########
|
|
## Test read of single channel with callback
|
|
###########
|
|
|
|
# Add callback to update readback
|
|
value_callback_buffer: list[tuple] = []
|
|
|
|
def value_callback(value, old_value, **kwargs):
|
|
obj = kwargs.get("obj")
|
|
galil = obj.parent.parent
|
|
readback = galil.read()
|
|
value_callback_buffer.append(readback)
|
|
|
|
galil_rio.analog_in.ch0.subscribe(value_callback, run=False)
|
|
galil_rio.controller.sock.buffer_recv = [b" 2.5 2.6 2.7 2.8 2.9 3.0 3.1 3.2"]
|
|
expected_values = [2.5, 2.6, 2.7, 2.8, 2.9, 3.0, 3.1, 3.2]
|
|
|
|
##################
|
|
## Test cached readback
|
|
##################
|
|
|
|
# Should have used the cached value
|
|
for walk in galil_rio.walk_signals():
|
|
walk.item._readback_timeout = 10 # Make sure cached read is used
|
|
ret = galil_rio.analog_in.ch0.read()
|
|
|
|
# Should not trigger callback since value did not change
|
|
assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 1.234)
|
|
# Same timestamp as for another channel as this is cached read
|
|
assert np.isclose(
|
|
ret[galil_rio.analog_in.ch0.name]["timestamp"], galil_rio.analog_in.ch7.timestamp
|
|
)
|
|
assert len(value_callback_buffer) == 0
|
|
|
|
##################
|
|
## Test unchached read from controller
|
|
##################
|
|
|
|
# Now force a read from the controller
|
|
galil_rio.analog_in.ch0._last_readback = 0 # Force read from controller
|
|
ret = galil_rio.analog_in.ch0.read()
|
|
|
|
assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 2.5)
|
|
|
|
# Check callback invocation, but only 1 callback even with galil_rio.read() call in callback
|
|
assert len(value_callback_buffer) == 1
|
|
values = [value["value"] for value in value_callback_buffer[0].values()]
|
|
assert np.isclose(values, expected_values).all()
|
|
assert all(
|
|
[
|
|
value["timestamp"]
|
|
== value_callback_buffer[0][galil_rio.analog_in.ch0.name]["timestamp"]
|
|
for value in value_callback_buffer[0].values()
|
|
]
|
|
)
|
|
|
|
|
|
def test_galil_rio_digital_out_signal(galil_rio):
|
|
"""
|
|
Test that the Galil RIO digital output signal can be set correctly.
|
|
"""
|
|
## Test Read from digital output channels
|
|
buffer_receive = []
|
|
excepted_put_buffer = []
|
|
for ii in range(galil_rio.digital_out.ch0._NUM_DIGITAL_OUTPUT_CHANNELS):
|
|
cmd = f"MG@OUT[{ii}]\r".encode()
|
|
excepted_put_buffer.append(cmd)
|
|
recv = " 1.000".encode()
|
|
buffer_receive.append(recv)
|
|
|
|
galil_rio.controller.sock.buffer_recv = buffer_receive # Mock response for readback
|
|
|
|
digital_read = galil_rio.read_configuration() # Read to populate readback values
|
|
|
|
for walk in galil_rio.digital_out.walk_signals():
|
|
assert np.isclose(digital_read[walk.item.name]["value"], 1.0)
|
|
|
|
assert galil_rio.controller.sock.buffer_put == excepted_put_buffer
|
|
|
|
# Test writing to digital output channels
|
|
galil_rio.controller.sock.buffer_put = [] # Clear buffer put
|
|
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback
|
|
|
|
# Set digital output channel 0 to high
|
|
galil_rio.digital_out.ch0.put(1)
|
|
assert galil_rio.controller.sock.buffer_put == [b"SB0\r"]
|
|
|
|
# Set digital output channel 0 to low
|
|
galil_rio.controller.sock.buffer_put = [] # Clear buffer put
|
|
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback
|
|
galil_rio.digital_out.ch0.put(0)
|
|
assert galil_rio.controller.sock.buffer_put == [b"CB0\r"]
|