diff --git a/ophyd_devices/galil/fupr_ophyd.py b/ophyd_devices/galil/fupr_ophyd.py index f7aa2cf..d0c26ed 100644 --- a/ophyd_devices/galil/fupr_ophyd.py +++ b/ophyd_devices/galil/fupr_ophyd.py @@ -8,6 +8,8 @@ from ophyd import Component as Cpt from ophyd import Device, PositionerBase, Signal from ophyd.status import wait as status_wait from ophyd.utils import LimitError, ReadOnlyError +from prettytable import PrettyTable + from ophyd_devices.galil.galil_ophyd import ( BECConfigError, GalilAxesReferenced, @@ -22,7 +24,6 @@ from ophyd_devices.galil.galil_ophyd import ( ) from ophyd_devices.utils.controller import Controller, threadlocked from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected -from prettytable import PrettyTable logger = bec_logger.logger @@ -315,31 +316,3 @@ class FuprGalilMotor(Device, PositionerBase): def stop(self, *, success=False): self.controller.stop_all_axes() return super().stop(success=success) - - -# if __name__ == "__main__": -# mock = False -# if not mock: -# leyey = GalilMotor("H", name="leyey", host="mpc2680.psi.ch", port=8081, sign=-1) -# leyey.stage() -# status = leyey.move(0, wait=True) -# status = leyey.move(10, wait=True) -# leyey.read() - -# leyey.get() -# leyey.describe() - -# leyey.unstage() -# else: -# from ophyd_devices.utils.socket import SocketMock - -# leyex = GalilMotor( -# "G", name="leyex", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock -# ) -# leyey = GalilMotor( -# "H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock -# ) -# leyex.stage() -# # leyey.stage() - -# leyex.controller.galil_show_all() diff --git a/ophyd_devices/galil/galil_ophyd.py b/ophyd_devices/galil/galil_ophyd.py index a0292da..4498800 100644 --- a/ophyd_devices/galil/galil_ophyd.py +++ b/ophyd_devices/galil/galil_ophyd.py @@ -369,11 +369,7 @@ class GalilMotorIsMoving(GalilSignalRO): def get(self): val = super().get() if val is not None: - self._run_subs( - sub_type=self.SUB_VALUE, - value=val, - timestamp=time.time(), - ) + self._run_subs(sub_type=self.SUB_VALUE, value=val, timestamp=time.time()) return val @@ -385,11 +381,7 @@ class GalilAxesReferenced(GalilSignalRO): class GalilMotor(Device, PositionerBase): USER_ACCESS = ["controller"] - readback = Cpt( - GalilReadbackSignal, - signal_name="readback", - kind="hinted", - ) + readback = Cpt(GalilReadbackSignal, signal_name="readback", kind="hinted") user_setpoint = Cpt(GalilSetpointSignal, signal_name="setpoint") motor_resolution = Cpt(GalilMotorResolution, signal_name="resolution", kind="config") motor_is_moving = Cpt(GalilMotorIsMoving, signal_name="motor_is_moving", kind="normal") @@ -520,18 +512,10 @@ class GalilMotor(Device, PositionerBase): while self.motor_is_moving.get(): logger.info("motor is moving") val = self.readback.read() - self._run_subs( - sub_type=self.SUB_READBACK, - value=val, - timestamp=time.time(), - ) + self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) time.sleep(0.1) val = self.readback.read() - success = np.isclose( - val[self.name]["value"], - position, - atol=self.tolerance, - ) + success = np.isclose(val[self.name]["value"], position, atol=self.tolerance) if not success: print(" stop") @@ -593,6 +577,7 @@ class GalilMotor(Device, PositionerBase): if __name__ == "__main__": + # pytest: skip-file mock = False if not mock: leyey = GalilMotor("H", name="leyey", host="mpc2680.psi.ch", port=8081, sign=-1) diff --git a/tests/test_controller.py b/tests/test_controller.py index 6d69c13..6543587 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -14,6 +14,7 @@ def test_controller_off(): # make sure it is indempotent controller.off() + controller._reset_controller() def test_controller_on(): @@ -28,3 +29,4 @@ def test_controller_on(): # make sure it is indempotent controller.on() socket_cls().open.assert_called_once() + controller._reset_controller() diff --git a/tests/test_fupr_ophyd.py b/tests/test_fupr_ophyd.py new file mode 100644 index 0000000..62589b8 --- /dev/null +++ b/tests/test_fupr_ophyd.py @@ -0,0 +1,60 @@ +from unittest import mock + +import pytest +from utils import SocketMock + +from ophyd_devices.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor + + +@pytest.fixture +def fsamroy(): + FuprGalilController._reset_controller() + fsamroy_motor = FuprGalilMotor( + "A", name="fsamroy", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock + ) + fsamroy_motor.controller.on() + assert isinstance(fsamroy_motor.controller, FuprGalilController) + yield fsamroy_motor + fsamroy_motor.controller.off() + fsamroy_motor.controller._reset_controller() + + +@pytest.mark.parametrize( + "pos,msg_received,msg_put,sign", + [ + (-0.5, b" -12800\n\r", [b"TPA\r", b"MG_BGA\r", b"TPA\r"], 1), + (-0.5, b" 12800\n\r", [b"TPA\r", b"MG_BGA\r", b"TPA\r"], -1), + ], +) +def test_axis_get(fsamroy, pos, msg_received, msg_put, sign): + fsamroy.sign = sign + fsamroy.device_manager = mock.MagicMock() + fsamroy.controller.sock.flush_buffer() + fsamroy.controller.sock.buffer_recv = msg_received + val = fsamroy.read() + assert val["fsamroy"]["value"] == pos + assert fsamroy.readback.get() == pos + assert fsamroy.controller.sock.buffer_put == msg_put + + +@pytest.mark.parametrize( + "target_pos,socket_put_messages,socket_get_messages", + [ + ( + 0, + [b"MG axisref\r", b"PAA=0\r", b"PAA=0\r", b"BGA\r"], + [b"1.00", b"-1", b":", b":", b":", b":", b"-1"], + ) + ], +) +def test_axis_put(fsamroy, target_pos, socket_put_messages, socket_get_messages): + fsamroy.controller.sock.flush_buffer() + fsamroy.controller.sock.buffer_recv = socket_get_messages + fsamroy.user_setpoint.put(target_pos) + assert fsamroy.controller.sock.buffer_put == socket_put_messages + + +def test_drive_axis_to_limit(fsamroy): + fsamroy.controller.sock.flush_buffer() + with pytest.raises(NotImplementedError): + fsamroy.controller.drive_axis_to_limit(0, "forward") diff --git a/tests/test_galil.py b/tests/test_galil.py index 5628ef8..2a2991f 100644 --- a/tests/test_galil.py +++ b/tests/test_galil.py @@ -3,26 +3,36 @@ from unittest import mock import pytest from utils import SocketMock -from ophyd_devices.galil.galil_ophyd import GalilMotor +from ophyd_devices.galil.galil_ophyd import GalilController, GalilMotor -@pytest.mark.parametrize( - "pos,msg,sign", - [ - (1, b" -12800\n\r", 1), - (-1, b" -12800\n\r", -1), - ], -) -def test_axis_get(pos, msg, sign): - leyey = GalilMotor( - "H", - name="leyey", - host="mpc2680.psi.ch", - port=8081, - sign=sign, - socket_cls=SocketMock, +@pytest.fixture(scope="function") +def leyey(): + GalilController._reset_controller() + leyey_motor = GalilMotor( + "H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock ) - leyey.controller.on() + leyey_motor.controller.on() + yield leyey_motor + leyey_motor.controller.off() + leyey_motor.controller._reset_controller() + + +@pytest.fixture(scope="function") +def leyex(): + GalilController._reset_controller() + leyex_motor = GalilMotor( + "A", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock + ) + leyex_motor.controller.on() + yield leyex_motor + leyex_motor.controller.off() + leyex_motor.controller._reset_controller() + + +@pytest.mark.parametrize("pos,msg,sign", [(1, b" -12800\n\r", 1), (-1, b" -12800\n\r", -1)]) +def test_axis_get(leyey, pos, msg, sign): + leyey.sign = sign leyey.controller.sock.flush_buffer() leyey.controller.sock.buffer_recv = msg val = leyey.read() @@ -44,21 +54,11 @@ def test_axis_get(pos, msg, sign): b"XQ#NEWPAR\r", b"MG_XQ0\r", ], - [ - b"1.00", - b"-1", - b":", - b":", - b":", - b":", - b"-1", - ], - ), + [b"1.00", b"-1", b":", b":", b":", b":", b"-1"], + ) ], ) -def test_axis_put(target_pos, socket_put_messages, socket_get_messages): - leyey = GalilMotor("H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock) - leyey.controller.on() +def test_axis_put(leyey, target_pos, socket_put_messages, socket_get_messages): leyey.controller.sock.flush_buffer() leyey.controller.sock.buffer_recv = socket_get_messages leyey.user_setpoint.put(target_pos) @@ -82,17 +82,7 @@ def test_axis_put(target_pos, socket_put_messages, socket_get_messages): b"MG_XQ2\r", b"MG _LRA, _LFA\r", ], - [ - b":", - b":", - b":", - b":", - b"0", - b"0", - b"-1", - b"-1", - b"1.000 0.000", - ], + [b":", b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.000 0.000"], ), ( 1, @@ -108,27 +98,15 @@ def test_axis_put(target_pos, socket_put_messages, socket_get_messages): b"MG_XQ2\r", b"MG _LRB, _LFB\r", ], - [ - b":", - b":", - b":", - b":", - b"0", - b"0", - b"-1", - b"-1", - b"0.000 1.000", - ], + [b":", b":", b":", b":", b"0", b"0", b"-1", b"-1", b"0.000 1.000"], ), ], ) -def test_drive_axis_to_limit(axis_nr, direction, socket_put_messages, socket_get_messages): - leyey = GalilMotor("A", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock) - leyey.controller.on() - leyey.controller.sock.flush_buffer() - leyey.controller.sock.buffer_recv = socket_get_messages - leyey.controller.drive_axis_to_limit(axis_nr, direction) - assert leyey.controller.sock.buffer_put == socket_put_messages +def test_drive_axis_to_limit(leyex, axis_nr, direction, socket_put_messages, socket_get_messages): + leyex.controller.sock.flush_buffer() + leyex.controller.sock.buffer_recv = socket_get_messages + leyex.controller.drive_axis_to_limit(axis_nr, direction) + assert leyex.controller.sock.buffer_put == socket_put_messages @pytest.mark.parametrize( @@ -146,16 +124,7 @@ def test_drive_axis_to_limit(axis_nr, direction, socket_put_messages, socket_get b"MG_XQ2\r", b"MG axisref[0]\r", ], - [ - b":", - b":", - b":", - b"0", - b"0", - b"-1", - b"-1", - b"1.00", - ], + [b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.00"], ), ( 1, @@ -169,23 +138,12 @@ def test_drive_axis_to_limit(axis_nr, direction, socket_put_messages, socket_get b"MG_XQ2\r", b"MG axisref[1]\r", ], - [ - b":", - b":", - b":", - b"0", - b"0", - b"-1", - b"-1", - b"1.00", - ], + [b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.00"], ), ], ) -def test_find_reference(axis_nr, socket_put_messages, socket_get_messages): - leyey = GalilMotor("A", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock) - leyey.controller.on() - leyey.controller.sock.flush_buffer() - leyey.controller.sock.buffer_recv = socket_get_messages - leyey.controller.find_reference(axis_nr) - assert leyey.controller.sock.buffer_put == socket_put_messages +def test_find_reference(leyex, axis_nr, socket_put_messages, socket_get_messages): + leyex.controller.sock.flush_buffer() + leyex.controller.sock.buffer_recv = socket_get_messages + leyex.controller.find_reference(axis_nr) + assert leyex.controller.sock.buffer_put == socket_put_messages diff --git a/tests/test_galil_flomni.py b/tests/test_galil_flomni.py new file mode 100644 index 0000000..cb7255a --- /dev/null +++ b/tests/test_galil_flomni.py @@ -0,0 +1,172 @@ +from unittest import mock + +import pytest +from utils import SocketMock + +from ophyd_devices.galil.fgalil_ophyd import FlomniGalilController, FlomniGalilMotor + + +@pytest.fixture(scope="function") +def leyey(): + FlomniGalilController._reset_controller() + leyey_motor = FlomniGalilMotor( + "H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock + ) + leyey_motor.controller.on() + yield leyey_motor + leyey_motor.controller.off() + leyey_motor.controller._reset_controller() + + +@pytest.fixture(scope="function") +def leyex(): + FlomniGalilController._reset_controller() + leyex_motor = FlomniGalilMotor( + "H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock + ) + leyex_motor.controller.on() + yield leyex_motor + leyex_motor.controller.off() + leyex_motor.controller._reset_controller() + + +@pytest.mark.parametrize("pos,msg,sign", [(1, b" -12800\n\r", 1), (-1, b" -12800\n\r", -1)]) +def test_axis_get(leyey, pos, msg, sign): + leyey.sign = sign + leyey.controller.sock.flush_buffer() + leyey.controller.sock.buffer_recv = msg + val = leyey.read() + assert val["leyey"]["value"] == pos + assert leyey.readback.get() == pos + + +@pytest.mark.parametrize( + "target_pos,socket_put_messages,socket_get_messages", + [ + ( + 0, + [ + b"MG allaxref\r", + b"MG_XQ0\r", + b"naxis=7\r", + b"ntarget=0.000\r", + b"movereq=1\r", + b"XQ#NEWPAR\r", + b"MG_XQ0\r", + ], + [b"1.00", b"-1", b":", b":", b":", b":", b"-1"], + ) + ], +) +def test_axis_put(leyey, target_pos, socket_put_messages, socket_get_messages): + leyey.controller.sock.flush_buffer() + leyey.controller.sock.buffer_recv = socket_get_messages + leyey.user_setpoint.put(target_pos) + assert leyey.controller.sock.buffer_put == socket_put_messages + + +@pytest.mark.parametrize( + "axis_nr,direction,socket_put_messages,socket_get_messages", + [ + ( + 0, + "forward", + [ + b"naxis=0\r", + b"ndir=1\r", + b"XQ#NEWPAR\r", + b"XQ#FES\r", + b"MG_XQ0\r", + b"MG _MOA\r", + b"MG_XQ0\r", + b"MG _MOA\r", + b"MG _LRA, _LFA\r", + ], + [b":", b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.000 0.000"], + ), + ( + 1, + "reverse", + [ + b"naxis=1\r", + b"ndir=-1\r", + b"XQ#NEWPAR\r", + b"XQ#FES\r", + b"MG_XQ0\r", + b"MG _MOB\r", + b"MG_XQ0\r", + b"MG _MOB\r", + b"MG _LRB, _LFB\r", + ], + [b":", b":", b":", b":", b"0", b"0", b"-1", b"-1", b"0.000 1.000"], + ), + ], +) +def test_drive_axis_to_limit(leyex, axis_nr, direction, socket_put_messages, socket_get_messages): + leyex.controller.sock.flush_buffer() + leyex.controller.sock.buffer_recv = socket_get_messages + leyex.controller.drive_axis_to_limit(axis_nr, direction) + assert leyex.controller.sock.buffer_put == socket_put_messages + + +@pytest.mark.parametrize( + "axis_nr,socket_put_messages,socket_get_messages", + [ + ( + 0, + [ + b"naxis=0\r", + b"XQ#NEWPAR\r", + b"XQ#FRM\r", + b"MG_XQ0\r", + b"MG _MOA\r", + b"MG_XQ0\r", + b"MG _MOA\r", + b"MG axisref[0]\r", + ], + [b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.00"], + ), + ( + 1, + [ + b"naxis=1\r", + b"XQ#NEWPAR\r", + b"XQ#FRM\r", + b"MG_XQ0\r", + b"MG _MOB\r", + b"MG_XQ0\r", + b"MG _MOB\r", + b"MG axisref[1]\r", + ], + [b":", b":", b":", b"0", b"0", b"-1", b"-1", b"1.00"], + ), + ], +) +def test_find_reference(leyex, axis_nr, socket_put_messages, socket_get_messages): + leyex.controller.sock.flush_buffer() + leyex.controller.sock.buffer_recv = socket_get_messages + leyex.controller.find_reference(axis_nr) + assert leyex.controller.sock.buffer_put == socket_put_messages + + +@pytest.mark.parametrize( + "axis_Id,socket_put_messages,socket_get_messages,triggered", + [ + ("A", [b"MG @IN[14]\r"], [b" 1.0000\n"], True), + ("B", [b"MG @IN[14]\r"], [b" 0.0000\n"], False), + ], +) +def test_fosaz_light_curtain_is_triggered( + axis_Id, socket_put_messages, socket_get_messages, triggered +): + """test that the light curtain is triggered""" + fosaz = FlomniGalilMotor( + axis_Id, name="fosaz", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock + ) + fosaz.controller.on() + fosaz.controller.sock.flush_buffer() + fosaz.controller.sock.buffer_recv = socket_get_messages + assert fosaz.controller.fosaz_light_curtain_is_triggered() == triggered + assert fosaz.controller.sock.buffer_put == socket_put_messages + fosaz.controller.off() + fosaz.controller._reset_controller()