diff --git a/ophyd_devices/devices/psi_motor.py b/ophyd_devices/devices/psi_motor.py index 7e49577..2b444bc 100644 --- a/ophyd_devices/devices/psi_motor.py +++ b/ophyd_devices/devices/psi_motor.py @@ -55,7 +55,7 @@ class EpicsMotor(OphydEpicsMotor): doc="Enable control. Either 0 (disabled) or 1 (enabled).", ) - def move(self, position, wait=True, **kwargs) -> MoveStatus: + def move(self, position, wait=False, **kwargs) -> MoveStatus: """Extended move function with a few sanity checks Note that the default EpicsMotor only supports the 'GO' movement mode. @@ -63,7 +63,7 @@ class EpicsMotor(OphydEpicsMotor): """ # Reset SPMG before move if self.motor_mode.get() != SpmgStates.GO: - self.motor_mode.set(SpmgStates.GO).wait() + self.motor_mode.set(SpmgStates.GO).wait(timeout=5) # Warn if EPIC motorRecord claims an error (it's not easy to reset) status = self.motor_status.get() if status: @@ -166,7 +166,7 @@ class EpicsMotorEC(EpicsMotor): high_limit_travel = Cpt(TravelLimitsECMC, ".HLM", kind="omitted", auto_monitor=True) low_limit_travel = Cpt(TravelLimitsECMC, ".LLM", kind="omitted", auto_monitor=True) - def move(self, position, wait=True, **kwargs) -> MoveStatus: + def move(self, position, wait=False, **kwargs) -> MoveStatus: """Extended move function with a few sanity checks Check ECMC error and interlocks. They may get cleared by the move command. If not, exception @@ -180,7 +180,7 @@ class EpicsMotorEC(EpicsMotor): # Warn if trying to move beyond an active limit if self.high_interlock.get(use_monitor=False) and position > self.position: self.log.warning("Attempting to move above active HLS or Ilock") - if self.high_interlock.get(use_monitor=False) and position < self.position: + if self.low_interlock.get(use_monitor=False) and position < self.position: self.log.warning("Attempting to move below active LLS or Ilock") return super().move(position, wait, **kwargs) diff --git a/tests/test_psi_motors.py b/tests/test_psi_motors.py new file mode 100644 index 0000000..c7f76e6 --- /dev/null +++ b/tests/test_psi_motors.py @@ -0,0 +1,168 @@ +"""This module tests the EpicsMotor and EpicsMotorEC classes for custom +PSI motor integration from the ophyd_devices.devices.psi_motor module.""" + +from __future__ import annotations + +import threading +from unittest import mock + +import ophyd +import pytest + +from ophyd_devices.devices.psi_motor import EpicsMotor, EpicsMotorEC, SpmgStates +from ophyd_devices.tests.utils import MockPV, patch_dual_pvs + + +@pytest.fixture(scope="function") +def mock_epics_motor(): + """Fixture to create a mock EpicsMotor instance.""" + name = "test_motor" + prefix = "SIM:MOTOR" + with mock.patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + motor = EpicsMotor(name=name, prefix=prefix) + motor.wait_for_connection() + patch_dual_pvs(motor) + yield motor + + +@pytest.fixture(scope="function") +def mock_epics_motor_ec(): + """Fixture to create a mock EpicsMotorEC instance.""" + name = "test_motor_ec" + prefix = "SIM:MOTOR:EC" + with mock.patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + motor_ec = EpicsMotorEC(name=name, prefix=prefix) + motor_ec.wait_for_connection() + patch_dual_pvs(motor_ec) + yield motor_ec + + +def test_epics_motor_limits_raise(mock_epics_motor): + """Test the move method of EpicsMotor.""" + motor = mock_epics_motor + motor.user_setpoint._metadata["lower_ctrl_limit"] = -10 + motor.user_setpoint._metadata["upper_ctrl_limit"] = 10 + motor.low_limit_travel.put(-10) + motor.high_limit_travel.put(10) + with pytest.raises(ophyd.utils.LimitError): + motor.move(-15) + + +def test_epics_motor_move(mock_epics_motor): + """Test the move method of EpicsMotor.""" + motor = mock_epics_motor + assert motor.user_readback.get() == 0 + assert motor.user_setpoint.get() == 0 + motor.user_setpoint._metadata["lower_ctrl_limit"] = -10 + motor.user_setpoint._metadata["upper_ctrl_limit"] = 10 + motor.motor_mode.put(SpmgStates.GO) + # Now this should raise... in theory... + motor.user_readback._read_pv.set_severity(ophyd.utils.epics_pvs.AlarmSeverity.NO_ALARM) + motor.user_readback._metadata["severity"] = ophyd.utils.epics_pvs.AlarmSeverity.NO_ALARM + motor.user_readback._read_pv.set_alarm_status(ophyd.utils.epics_pvs.AlarmStatus.NO_ALARM) + motor.user_readback._metadata["status"] = ophyd.utils.epics_pvs.AlarmStatus.NO_ALARM + # Set alarm of motor to + status = motor.move(5) + assert status.done is False + assert motor.user_readback.get() == 0 + assert motor.user_setpoint.get() == 5 + motor.motor_done_move._read_pv.mock_data = 0 + for ii in range(1, 6): + motor.user_readback._read_pv.mock_data = ii + if ii == 5: + motor.motor_done_move._read_pv.mock_data = 1 + else: + motor.motor_done_move._read_pv.mock_data = 0 + + status.wait(timeout=5) + assert status.done is True + assert motor.user_readback.get() == 5 + assert motor.user_setpoint.get() == 5 + + # Now this should raise... in theory... + motor.user_readback._read_pv.set_severity(ophyd.utils.epics_pvs.AlarmSeverity.MAJOR) + motor.user_readback._metadata["severity"] = ophyd.utils.epics_pvs.AlarmSeverity.MAJOR + motor.user_readback._read_pv.set_alarm_status(ophyd.utils.epics_pvs.AlarmStatus.READ) + motor.user_readback._metadata["status"] = ophyd.utils.epics_pvs.AlarmStatus.READ + status = motor.move(10) + motor.motor_done_move._read_pv.mock_data = 0 + assert status.done is False + assert motor.user_readback.get() == 5 + assert motor.user_setpoint.get() == 10 + motor.user_readback._read_pv.mock_data = 7 + motor.motor_done_move._read_pv.mock_data = 0 + assert status.done is False + motor.user_readback._read_pv.mock_data = 10 + motor.motor_done_move._read_pv.mock_data = 1 + status.wait(timeout=5) + assert status.done is True + + # But it only raises if we do the following + motor.tolerated_alarm = ophyd.utils.epics_pvs.AlarmSeverity.MINOR + status = motor.move(6) + + +def test_move_epics_motor_ec(mock_epics_motor_ec): + """Test the move method of EpicsMotorEC.""" + motor_ec = mock_epics_motor_ec + motor_ec.user_setpoint._metadata["lower_ctrl_limit"] = -10 + motor_ec.user_setpoint._metadata["upper_ctrl_limit"] = 10 + motor_ec.user_readback._read_pv.set_severity(ophyd.utils.epics_pvs.AlarmSeverity.NO_ALARM) + motor_ec.user_readback._metadata["severity"] = ophyd.utils.epics_pvs.AlarmSeverity.NO_ALARM + motor_ec.user_readback._read_pv.set_alarm_status(ophyd.utils.epics_pvs.AlarmStatus.NO_ALARM) + motor_ec.user_readback._metadata["status"] = ophyd.utils.epics_pvs.AlarmStatus.NO_ALARM + + assert motor_ec.user_readback.get() == 0 + assert motor_ec.user_setpoint.get() == 0 + + motor_ec.error._read_pv.mock_data = 0 + with mock.patch.object(motor_ec.log, "warning") as mock_log_warning: + # Should not log any warning + status = motor_ec.move(5) + motor_ec.motor_done_move._read_pv.mock_data = 0 + assert mock_log_warning.call_count == 0 + motor_ec.user_readback._read_pv.mock_data = 5 + motor_ec.motor_done_move._read_pv.mock_data = 1 + status.wait(timeout=5) + assert status.done is True + + # If error exists, this should log a warning, and PLC logic should try to remove it itself + motor_ec.error._read_pv.mock_data = 1 + status = motor_ec.move(8) + motor_ec.motor_done_move._read_pv.mock_data = 0 + assert status.done is False + assert mock_log_warning.call_count == 1 + motor_ec.user_readback._read_pv.mock_data = 8 + motor_ec.motor_done_move._read_pv.mock_data = 1 + status.wait(timeout=5) + assert status.done is True + motor_ec.error._read_pv.mock_data = 0 + + # Attempting to move above active HLS should raise a warning + # Note: Position has to be > current position from 8 to 9... + motor_ec.high_interlock._read_pv.mock_data = 1 + status = motor_ec.move(9) + motor_ec.motor_done_move._read_pv.mock_data = 0 + assert status.done is False + assert mock_log_warning.call_count == 2 + motor_ec.user_readback._read_pv.mock_data = 2 + motor_ec.motor_done_move._read_pv.mock_data = 1 + motor_ec.high_interlock._read_pv.mock_data = 0 + status.wait(timeout=5) + assert status.done is True + + # Attempting to move below active LLS should raise a warning + motor_ec.low_interlock._read_pv.mock_data = 1 + status = motor_ec.move(-2) + motor_ec.motor_done_move._read_pv.mock_data = 0 + assert status.done is False + assert mock_log_warning.call_count == 3 + motor_ec.user_readback._read_pv.mock_data = -2 + motor_ec.motor_done_move._read_pv.mock_data = 1 + status.wait(timeout=5) + assert status.done is True + motor_ec.low_interlock._read_pv.mock_data = 0