diff --git a/ophyd_devices/devices/psi_motor.py b/ophyd_devices/devices/psi_motor.py index a33f855..29b792b 100644 --- a/ophyd_devices/devices/psi_motor.py +++ b/ophyd_devices/devices/psi_motor.py @@ -4,6 +4,7 @@ This module extends the basic EpicsMotor with additional functionality. It exposes additional parameters of the EPICS MotorRecord and provides a more detailed interface for motors using the new ECMC-based motion systems at PSI. """ + import functools from ophyd import Component as Cpt @@ -57,7 +58,7 @@ class EpicsMotor(OphydEpicsMotor): before movement. Usage is the same as EpicsMotor. """ - tolerated_alarm = AlarmSeverity.INVALID + tolerated_alarm = AlarmSeverity.MINOR motor_deadband = Cpt( EpicsSignal, ".RDBD", auto_monitor=True, kind=Kind.omitted, doc="Retry Deadband (EGU)" @@ -90,12 +91,10 @@ class EpicsMotor(OphydEpicsMotor): # Check if we are at limit switches if self.low_limit_switch.get(use_monitor=False) == 1: success = False - exc = RuntimeError( - f"Motor {self.name} hit the low limit switch during motion") + exc = RuntimeError(f"Motor {self.name} hit the low limit switch during motion") if self.high_limit_switch.get(use_monitor=False) == 1: success = False - exc = RuntimeError( - f"Motor {self.name} hit the high limit switch during motion") + exc = RuntimeError(f"Motor {self.name} hit the high limit switch during motion") # Check the severity of the alarm field after motion is complete. # If there is any alarm at all warn the user, and if the alarm is @@ -106,8 +105,7 @@ class EpicsMotor(OphydEpicsMotor): status = self.user_readback.alarm_status if severity > self.tolerated_alarm: self.log.error( - "Motion failed: %s is in an alarm state " - "status=%s severity=%s", + "Motion failed: %s is in an alarm state " "status=%s severity=%s", self.name, status, severity, @@ -119,8 +117,7 @@ class EpicsMotor(OphydEpicsMotor): ) else: self.log.warning( - "Motor %s raised an alarm during motion " - "status=%s severity %s", + "Motor %s raised an alarm during motion " "status=%s severity %s", self.name, status, severity, @@ -138,17 +135,11 @@ class EpicsMotor(OphydEpicsMotor): started = self._started_moving = not was_moving and self._moving self.log.debug( - "[ts=%s] %s moving: %s (value=%s)", - fmt_time(timestamp), - self, - self._moving, - value, + "[ts=%s] %s moving: %s (value=%s)", fmt_time(timestamp), self, self._moving, value ) if started: - self._run_subs( - sub_type=self.SUB_START, timestamp=timestamp, value=value, **kwargs - ) + self._run_subs(sub_type=self.SUB_START, timestamp=timestamp, value=value, **kwargs) if was_moving and not self._moving: success, exc = self._check_motion_status() @@ -159,9 +150,7 @@ class EpicsMotor(OphydEpicsMotor): if success: self._run_subs(sub_type=self.SUB_DONE, timestamp=timestamp, value=value) - self._run_subs( - sub_type=self._SUB_REQ_DONE, success=success, timestamp=timestamp, **kwargs - ) + self._run_subs(sub_type=self._SUB_REQ_DONE, success=success, timestamp=timestamp, **kwargs) self._reset_sub(self._SUB_REQ_DONE) def move(self, position, wait=False, moved_cb=None, timeout=None, **kwargs) -> MoveStatus: @@ -197,9 +186,7 @@ class EpicsMotor(OphydEpicsMotor): self._run_subs(sub_type=self._SUB_REQ_DONE, success=False) self._reset_sub(self._SUB_REQ_DONE) - status = MoveStatus( - self, position, timeout=timeout, settle_time=self._settle_time - ) + status = MoveStatus(self, position, timeout=timeout, settle_time=self._settle_time) if moved_cb is not None: status.add_callback(functools.partial(moved_cb, obj=self)) @@ -210,9 +197,9 @@ class EpicsMotor(OphydEpicsMotor): status.set_finished() else: if exception is None: - exception = UnknownStatusFailure( - f"{self.name} failed to move to {position}") + exception = UnknownStatusFailure(f"{self.name} failed to move to {position}") status.set_exception(exception) + self.subscribe(set_status, event_type=self._SUB_REQ_DONE, run=False) self.user_setpoint.put(position, wait=False) @@ -224,6 +211,7 @@ class EpicsMotor(OphydEpicsMotor): raise return status + class EpicsMotorEC(EpicsMotor): """Detailed ECMC EPICS motor class @@ -276,9 +264,7 @@ class EpicsMotorEC(EpicsMotor): if error: success = False message_text = self.message_text.get(use_monitor=False) - exception = RuntimeError( - f"Motor {self.name} reported ECMC error '{message_text}'" - ) + exception = RuntimeError(f"Motor {self.name} reported ECMC error '{message_text}'") return success, exception def move(self, position, wait=False, **kwargs) -> MoveStatus: @@ -316,11 +302,9 @@ class EpicsMotorEC(EpicsMotor): might need to push physical buttons. """ # Reset the error - self.error_reset.set(1, settle_time=0.2).wait() + self.error_reset.set(1, settle_time=0.2).wait(timeout=5) # block for 5s # Check if it disappeared error = self.error.get(auto_monitor=False) if error: message_text = self.message_text.get(auto_monitor=False) - raise RuntimeError( - f"Failed to reset axis, still in error '{message_text}'" - ) + raise RuntimeError(f"Failed to reset axis, still in error '{message_text}'") diff --git a/tests/test_psi_motors.py b/tests/test_psi_motors.py index c7f76e6..4d61509 100644 --- a/tests/test_psi_motors.py +++ b/tests/test_psi_motors.py @@ -83,7 +83,7 @@ def test_epics_motor_move(mock_epics_motor): assert motor.user_readback.get() == 5 assert motor.user_setpoint.get() == 5 - # Now this should raise... in theory... + # Now this should raise... motor.user_readback._read_pv.set_severity(ophyd.utils.epics_pvs.AlarmSeverity.MAJOR) motor.user_readback._metadata["severity"] = ophyd.utils.epics_pvs.AlarmSeverity.MAJOR motor.user_readback._read_pv.set_alarm_status(ophyd.utils.epics_pvs.AlarmStatus.READ) @@ -98,12 +98,10 @@ def test_epics_motor_move(mock_epics_motor): assert status.done is False motor.user_readback._read_pv.mock_data = 10 motor.motor_done_move._read_pv.mock_data = 1 - status.wait(timeout=5) - assert status.done is True - - # But it only raises if we do the following - motor.tolerated_alarm = ophyd.utils.epics_pvs.AlarmSeverity.MINOR - status = motor.move(6) + with pytest.raises(RuntimeError): + status.wait(timeout=5) + assert status.done is True + assert isinstance(status.exception(), RuntimeError) def test_move_epics_motor_ec(mock_epics_motor_ec): @@ -130,7 +128,7 @@ def test_move_epics_motor_ec(mock_epics_motor_ec): status.wait(timeout=5) assert status.done is True - # If error exists, this should log a warning, and PLC logic should try to remove it itself + # If error exists, this should raise a warning motor_ec.error._read_pv.mock_data = 1 status = motor_ec.move(8) motor_ec.motor_done_move._read_pv.mock_data = 0 @@ -138,11 +136,12 @@ def test_move_epics_motor_ec(mock_epics_motor_ec): assert mock_log_warning.call_count == 1 motor_ec.user_readback._read_pv.mock_data = 8 motor_ec.motor_done_move._read_pv.mock_data = 1 - status.wait(timeout=5) - assert status.done is True - motor_ec.error._read_pv.mock_data = 0 + with pytest.raises(RuntimeError): + status.wait(timeout=5) + assert status.done is True + assert isinstance(status.exception(), RuntimeError) - # Attempting to move above active HLS should raise a warning + motor_ec.error._read_pv.mock_data = 0 # Note: Position has to be > current position from 8 to 9... motor_ec.high_interlock._read_pv.mock_data = 1 status = motor_ec.move(9)