test: added tests for fupr and flomni galil

This commit is contained in:
2023-12-12 19:27:52 +01:00
parent bf38e89f79
commit 1c95220234
6 changed files with 285 additions and 135 deletions

View File

@ -8,6 +8,8 @@ from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError, ReadOnlyError
from prettytable import PrettyTable
from ophyd_devices.galil.galil_ophyd import (
BECConfigError,
GalilAxesReferenced,
@ -22,7 +24,6 @@ from ophyd_devices.galil.galil_ophyd import (
)
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
from prettytable import PrettyTable
logger = bec_logger.logger
@ -315,31 +316,3 @@ class FuprGalilMotor(Device, PositionerBase):
def stop(self, *, success=False):
self.controller.stop_all_axes()
return super().stop(success=success)
# if __name__ == "__main__":
# mock = False
# if not mock:
# leyey = GalilMotor("H", name="leyey", host="mpc2680.psi.ch", port=8081, sign=-1)
# leyey.stage()
# status = leyey.move(0, wait=True)
# status = leyey.move(10, wait=True)
# leyey.read()
# leyey.get()
# leyey.describe()
# leyey.unstage()
# else:
# from ophyd_devices.utils.socket import SocketMock
# leyex = GalilMotor(
# "G", name="leyex", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
# )
# leyey = GalilMotor(
# "H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
# )
# leyex.stage()
# # leyey.stage()
# leyex.controller.galil_show_all()

View File

@ -369,11 +369,7 @@ class GalilMotorIsMoving(GalilSignalRO):
def get(self):
val = super().get()
if val is not None:
self._run_subs(
sub_type=self.SUB_VALUE,
value=val,
timestamp=time.time(),
)
self._run_subs(sub_type=self.SUB_VALUE, value=val, timestamp=time.time())
return val
@ -385,11 +381,7 @@ class GalilAxesReferenced(GalilSignalRO):
class GalilMotor(Device, PositionerBase):
USER_ACCESS = ["controller"]
readback = Cpt(
GalilReadbackSignal,
signal_name="readback",
kind="hinted",
)
readback = Cpt(GalilReadbackSignal, signal_name="readback", kind="hinted")
user_setpoint = Cpt(GalilSetpointSignal, signal_name="setpoint")
motor_resolution = Cpt(GalilMotorResolution, signal_name="resolution", kind="config")
motor_is_moving = Cpt(GalilMotorIsMoving, signal_name="motor_is_moving", kind="normal")
@ -520,18 +512,10 @@ class GalilMotor(Device, PositionerBase):
while self.motor_is_moving.get():
logger.info("motor is moving")
val = self.readback.read()
self._run_subs(
sub_type=self.SUB_READBACK,
value=val,
timestamp=time.time(),
)
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
time.sleep(0.1)
val = self.readback.read()
success = np.isclose(
val[self.name]["value"],
position,
atol=self.tolerance,
)
success = np.isclose(val[self.name]["value"], position, atol=self.tolerance)
if not success:
print(" stop")
@ -593,6 +577,7 @@ class GalilMotor(Device, PositionerBase):
if __name__ == "__main__":
# pytest: skip-file
mock = False
if not mock:
leyey = GalilMotor("H", name="leyey", host="mpc2680.psi.ch", port=8081, sign=-1)

View File

@ -14,6 +14,7 @@ def test_controller_off():
# make sure it is indempotent
controller.off()
controller._reset_controller()
def test_controller_on():
@ -28,3 +29,4 @@ def test_controller_on():
# make sure it is indempotent
controller.on()
socket_cls().open.assert_called_once()
controller._reset_controller()

60
tests/test_fupr_ophyd.py Normal file
View File

@ -0,0 +1,60 @@
from unittest import mock
import pytest
from utils import SocketMock
from ophyd_devices.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor
@pytest.fixture
def fsamroy():
FuprGalilController._reset_controller()
fsamroy_motor = FuprGalilMotor(
"A", name="fsamroy", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
)
fsamroy_motor.controller.on()
assert isinstance(fsamroy_motor.controller, FuprGalilController)
yield fsamroy_motor
fsamroy_motor.controller.off()
fsamroy_motor.controller._reset_controller()
@pytest.mark.parametrize(
"pos,msg_received,msg_put,sign",
[
(-0.5, b" -12800\n\r", [b"TPA\r", b"MG_BGA\r", b"TPA\r"], 1),
(-0.5, b" 12800\n\r", [b"TPA\r", b"MG_BGA\r", b"TPA\r"], -1),
],
)
def test_axis_get(fsamroy, pos, msg_received, msg_put, sign):
fsamroy.sign = sign
fsamroy.device_manager = mock.MagicMock()
fsamroy.controller.sock.flush_buffer()
fsamroy.controller.sock.buffer_recv = msg_received
val = fsamroy.read()
assert val["fsamroy"]["value"] == pos
assert fsamroy.readback.get() == pos
assert fsamroy.controller.sock.buffer_put == msg_put
@pytest.mark.parametrize(
"target_pos,socket_put_messages,socket_get_messages",
[
(
0,
[b"MG axisref\r", b"PAA=0\r", b"PAA=0\r", b"BGA\r"],
[b"1.00", b"-1", b":", b":", b":", b":", b"-1"],
)
],
)
def test_axis_put(fsamroy, target_pos, socket_put_messages, socket_get_messages):
fsamroy.controller.sock.flush_buffer()
fsamroy.controller.sock.buffer_recv = socket_get_messages
fsamroy.user_setpoint.put(target_pos)
assert fsamroy.controller.sock.buffer_put == socket_put_messages
def test_drive_axis_to_limit(fsamroy):
fsamroy.controller.sock.flush_buffer()
with pytest.raises(NotImplementedError):
fsamroy.controller.drive_axis_to_limit(0, "forward")

View File

@ -3,26 +3,36 @@ from unittest import mock
import pytest
from utils import SocketMock
from ophyd_devices.galil.galil_ophyd import GalilMotor
from ophyd_devices.galil.galil_ophyd import GalilController, GalilMotor
@pytest.mark.parametrize(
"pos,msg,sign",
[
(1, b" -12800\n\r", 1),
(-1, b" -12800\n\r", -1),
],
)
def test_axis_get(pos, msg, sign):
leyey = GalilMotor(
"H",
name="leyey",
host="mpc2680.psi.ch",
port=8081,
sign=sign,
socket_cls=SocketMock,
@pytest.fixture(scope="function")
def leyey():
GalilController._reset_controller()
leyey_motor = GalilMotor(
"H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
)
leyey.controller.on()
leyey_motor.controller.on()
yield leyey_motor
leyey_motor.controller.off()
leyey_motor.controller._reset_controller()
@pytest.fixture(scope="function")
def leyex():
GalilController._reset_controller()
leyex_motor = GalilMotor(
"A", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
)
leyex_motor.controller.on()
yield leyex_motor
leyex_motor.controller.off()
leyex_motor.controller._reset_controller()
@pytest.mark.parametrize("pos,msg,sign", [(1, b" -12800\n\r", 1), (-1, b" -12800\n\r", -1)])
def test_axis_get(leyey, pos, msg, sign):
leyey.sign = sign
leyey.controller.sock.flush_buffer()
leyey.controller.sock.buffer_recv = msg
val = leyey.read()
@ -44,21 +54,11 @@ def test_axis_get(pos, msg, sign):
b"XQ#NEWPAR\r",
b"MG_XQ0\r",
],
[
b"1.00",
b"-1",
b":",
b":",
b":",
b":",
b"-1",
],
),
[b"1.00", b"-1", b":", b":", b":", b":", b"-1"],
)
],
)
def test_axis_put(target_pos, socket_put_messages, socket_get_messages):
leyey = GalilMotor("H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock)
leyey.controller.on()
def test_axis_put(leyey, target_pos, socket_put_messages, socket_get_messages):
leyey.controller.sock.flush_buffer()
leyey.controller.sock.buffer_recv = socket_get_messages
leyey.user_setpoint.put(target_pos)
@ -82,17 +82,7 @@ def test_axis_put(target_pos, socket_put_messages, socket_get_messages):
b"MG_XQ2\r",
b"MG _LRA, _LFA\r",
],
[
b":",
b":",
b":",
b":",
b"0",
b"0",
b"-1",
b"-1",
b"1.000 0.000",
],
[b":", b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.000 0.000"],
),
(
1,
@ -108,27 +98,15 @@ def test_axis_put(target_pos, socket_put_messages, socket_get_messages):
b"MG_XQ2\r",
b"MG _LRB, _LFB\r",
],
[
b":",
b":",
b":",
b":",
b"0",
b"0",
b"-1",
b"-1",
b"0.000 1.000",
],
[b":", b":", b":", b":", b"0", b"0", b"-1", b"-1", b"0.000 1.000"],
),
],
)
def test_drive_axis_to_limit(axis_nr, direction, socket_put_messages, socket_get_messages):
leyey = GalilMotor("A", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock)
leyey.controller.on()
leyey.controller.sock.flush_buffer()
leyey.controller.sock.buffer_recv = socket_get_messages
leyey.controller.drive_axis_to_limit(axis_nr, direction)
assert leyey.controller.sock.buffer_put == socket_put_messages
def test_drive_axis_to_limit(leyex, axis_nr, direction, socket_put_messages, socket_get_messages):
leyex.controller.sock.flush_buffer()
leyex.controller.sock.buffer_recv = socket_get_messages
leyex.controller.drive_axis_to_limit(axis_nr, direction)
assert leyex.controller.sock.buffer_put == socket_put_messages
@pytest.mark.parametrize(
@ -146,16 +124,7 @@ def test_drive_axis_to_limit(axis_nr, direction, socket_put_messages, socket_get
b"MG_XQ2\r",
b"MG axisref[0]\r",
],
[
b":",
b":",
b":",
b"0",
b"0",
b"-1",
b"-1",
b"1.00",
],
[b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.00"],
),
(
1,
@ -169,23 +138,12 @@ def test_drive_axis_to_limit(axis_nr, direction, socket_put_messages, socket_get
b"MG_XQ2\r",
b"MG axisref[1]\r",
],
[
b":",
b":",
b":",
b"0",
b"0",
b"-1",
b"-1",
b"1.00",
],
[b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.00"],
),
],
)
def test_find_reference(axis_nr, socket_put_messages, socket_get_messages):
leyey = GalilMotor("A", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock)
leyey.controller.on()
leyey.controller.sock.flush_buffer()
leyey.controller.sock.buffer_recv = socket_get_messages
leyey.controller.find_reference(axis_nr)
assert leyey.controller.sock.buffer_put == socket_put_messages
def test_find_reference(leyex, axis_nr, socket_put_messages, socket_get_messages):
leyex.controller.sock.flush_buffer()
leyex.controller.sock.buffer_recv = socket_get_messages
leyex.controller.find_reference(axis_nr)
assert leyex.controller.sock.buffer_put == socket_put_messages

172
tests/test_galil_flomni.py Normal file
View File

@ -0,0 +1,172 @@
from unittest import mock
import pytest
from utils import SocketMock
from ophyd_devices.galil.fgalil_ophyd import FlomniGalilController, FlomniGalilMotor
@pytest.fixture(scope="function")
def leyey():
FlomniGalilController._reset_controller()
leyey_motor = FlomniGalilMotor(
"H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
)
leyey_motor.controller.on()
yield leyey_motor
leyey_motor.controller.off()
leyey_motor.controller._reset_controller()
@pytest.fixture(scope="function")
def leyex():
FlomniGalilController._reset_controller()
leyex_motor = FlomniGalilMotor(
"H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
)
leyex_motor.controller.on()
yield leyex_motor
leyex_motor.controller.off()
leyex_motor.controller._reset_controller()
@pytest.mark.parametrize("pos,msg,sign", [(1, b" -12800\n\r", 1), (-1, b" -12800\n\r", -1)])
def test_axis_get(leyey, pos, msg, sign):
leyey.sign = sign
leyey.controller.sock.flush_buffer()
leyey.controller.sock.buffer_recv = msg
val = leyey.read()
assert val["leyey"]["value"] == pos
assert leyey.readback.get() == pos
@pytest.mark.parametrize(
"target_pos,socket_put_messages,socket_get_messages",
[
(
0,
[
b"MG allaxref\r",
b"MG_XQ0\r",
b"naxis=7\r",
b"ntarget=0.000\r",
b"movereq=1\r",
b"XQ#NEWPAR\r",
b"MG_XQ0\r",
],
[b"1.00", b"-1", b":", b":", b":", b":", b"-1"],
)
],
)
def test_axis_put(leyey, target_pos, socket_put_messages, socket_get_messages):
leyey.controller.sock.flush_buffer()
leyey.controller.sock.buffer_recv = socket_get_messages
leyey.user_setpoint.put(target_pos)
assert leyey.controller.sock.buffer_put == socket_put_messages
@pytest.mark.parametrize(
"axis_nr,direction,socket_put_messages,socket_get_messages",
[
(
0,
"forward",
[
b"naxis=0\r",
b"ndir=1\r",
b"XQ#NEWPAR\r",
b"XQ#FES\r",
b"MG_XQ0\r",
b"MG _MOA\r",
b"MG_XQ0\r",
b"MG _MOA\r",
b"MG _LRA, _LFA\r",
],
[b":", b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.000 0.000"],
),
(
1,
"reverse",
[
b"naxis=1\r",
b"ndir=-1\r",
b"XQ#NEWPAR\r",
b"XQ#FES\r",
b"MG_XQ0\r",
b"MG _MOB\r",
b"MG_XQ0\r",
b"MG _MOB\r",
b"MG _LRB, _LFB\r",
],
[b":", b":", b":", b":", b"0", b"0", b"-1", b"-1", b"0.000 1.000"],
),
],
)
def test_drive_axis_to_limit(leyex, axis_nr, direction, socket_put_messages, socket_get_messages):
leyex.controller.sock.flush_buffer()
leyex.controller.sock.buffer_recv = socket_get_messages
leyex.controller.drive_axis_to_limit(axis_nr, direction)
assert leyex.controller.sock.buffer_put == socket_put_messages
@pytest.mark.parametrize(
"axis_nr,socket_put_messages,socket_get_messages",
[
(
0,
[
b"naxis=0\r",
b"XQ#NEWPAR\r",
b"XQ#FRM\r",
b"MG_XQ0\r",
b"MG _MOA\r",
b"MG_XQ0\r",
b"MG _MOA\r",
b"MG axisref[0]\r",
],
[b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.00"],
),
(
1,
[
b"naxis=1\r",
b"XQ#NEWPAR\r",
b"XQ#FRM\r",
b"MG_XQ0\r",
b"MG _MOB\r",
b"MG_XQ0\r",
b"MG _MOB\r",
b"MG axisref[1]\r",
],
[b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.00"],
),
],
)
def test_find_reference(leyex, axis_nr, socket_put_messages, socket_get_messages):
leyex.controller.sock.flush_buffer()
leyex.controller.sock.buffer_recv = socket_get_messages
leyex.controller.find_reference(axis_nr)
assert leyex.controller.sock.buffer_put == socket_put_messages
@pytest.mark.parametrize(
"axis_Id,socket_put_messages,socket_get_messages,triggered",
[
("A", [b"MG @IN[14]\r"], [b" 1.0000\n"], True),
("B", [b"MG @IN[14]\r"], [b" 0.0000\n"], False),
],
)
def test_fosaz_light_curtain_is_triggered(
axis_Id, socket_put_messages, socket_get_messages, triggered
):
"""test that the light curtain is triggered"""
fosaz = FlomniGalilMotor(
axis_Id, name="fosaz", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
)
fosaz.controller.on()
fosaz.controller.sock.flush_buffer()
fosaz.controller.sock.buffer_recv = socket_get_messages
assert fosaz.controller.fosaz_light_curtain_is_triggered() == triggered
assert fosaz.controller.sock.buffer_put == socket_put_messages
fosaz.controller.off()
fosaz.controller._reset_controller()