refactor(psi-motor): cleanup and fix tests

This commit is contained in:
2025-11-12 07:57:49 +01:00
committed by Christian Appel
parent 3d0a2119db
commit 33aa4b6cca
2 changed files with 27 additions and 44 deletions

View File

@@ -4,6 +4,7 @@ This module extends the basic EpicsMotor with additional functionality. It
exposes additional parameters of the EPICS MotorRecord and provides a more exposes additional parameters of the EPICS MotorRecord and provides a more
detailed interface for motors using the new ECMC-based motion systems at PSI. detailed interface for motors using the new ECMC-based motion systems at PSI.
""" """
import functools import functools
from ophyd import Component as Cpt from ophyd import Component as Cpt
@@ -57,7 +58,7 @@ class EpicsMotor(OphydEpicsMotor):
before movement. Usage is the same as EpicsMotor. before movement. Usage is the same as EpicsMotor.
""" """
tolerated_alarm = AlarmSeverity.INVALID tolerated_alarm = AlarmSeverity.MINOR
motor_deadband = Cpt( motor_deadband = Cpt(
EpicsSignal, ".RDBD", auto_monitor=True, kind=Kind.omitted, doc="Retry Deadband (EGU)" EpicsSignal, ".RDBD", auto_monitor=True, kind=Kind.omitted, doc="Retry Deadband (EGU)"
@@ -90,12 +91,10 @@ class EpicsMotor(OphydEpicsMotor):
# Check if we are at limit switches # Check if we are at limit switches
if self.low_limit_switch.get(use_monitor=False) == 1: if self.low_limit_switch.get(use_monitor=False) == 1:
success = False success = False
exc = RuntimeError( exc = RuntimeError(f"Motor {self.name} hit the low limit switch during motion")
f"Motor {self.name} hit the low limit switch during motion")
if self.high_limit_switch.get(use_monitor=False) == 1: if self.high_limit_switch.get(use_monitor=False) == 1:
success = False success = False
exc = RuntimeError( exc = RuntimeError(f"Motor {self.name} hit the high limit switch during motion")
f"Motor {self.name} hit the high limit switch during motion")
# Check the severity of the alarm field after motion is complete. # Check the severity of the alarm field after motion is complete.
# If there is any alarm at all warn the user, and if the alarm is # If there is any alarm at all warn the user, and if the alarm is
@@ -106,8 +105,7 @@ class EpicsMotor(OphydEpicsMotor):
status = self.user_readback.alarm_status status = self.user_readback.alarm_status
if severity > self.tolerated_alarm: if severity > self.tolerated_alarm:
self.log.error( self.log.error(
"Motion failed: %s is in an alarm state " "Motion failed: %s is in an alarm state " "status=%s severity=%s",
"status=%s severity=%s",
self.name, self.name,
status, status,
severity, severity,
@@ -119,8 +117,7 @@ class EpicsMotor(OphydEpicsMotor):
) )
else: else:
self.log.warning( self.log.warning(
"Motor %s raised an alarm during motion " "Motor %s raised an alarm during motion " "status=%s severity %s",
"status=%s severity %s",
self.name, self.name,
status, status,
severity, severity,
@@ -138,17 +135,11 @@ class EpicsMotor(OphydEpicsMotor):
started = self._started_moving = not was_moving and self._moving started = self._started_moving = not was_moving and self._moving
self.log.debug( self.log.debug(
"[ts=%s] %s moving: %s (value=%s)", "[ts=%s] %s moving: %s (value=%s)", fmt_time(timestamp), self, self._moving, value
fmt_time(timestamp),
self,
self._moving,
value,
) )
if started: if started:
self._run_subs( self._run_subs(sub_type=self.SUB_START, timestamp=timestamp, value=value, **kwargs)
sub_type=self.SUB_START, timestamp=timestamp, value=value, **kwargs
)
if was_moving and not self._moving: if was_moving and not self._moving:
success, exc = self._check_motion_status() success, exc = self._check_motion_status()
@@ -159,9 +150,7 @@ class EpicsMotor(OphydEpicsMotor):
if success: if success:
self._run_subs(sub_type=self.SUB_DONE, timestamp=timestamp, value=value) self._run_subs(sub_type=self.SUB_DONE, timestamp=timestamp, value=value)
self._run_subs( self._run_subs(sub_type=self._SUB_REQ_DONE, success=success, timestamp=timestamp, **kwargs)
sub_type=self._SUB_REQ_DONE, success=success, timestamp=timestamp, **kwargs
)
self._reset_sub(self._SUB_REQ_DONE) self._reset_sub(self._SUB_REQ_DONE)
def move(self, position, wait=False, moved_cb=None, timeout=None, **kwargs) -> MoveStatus: def move(self, position, wait=False, moved_cb=None, timeout=None, **kwargs) -> MoveStatus:
@@ -197,9 +186,7 @@ class EpicsMotor(OphydEpicsMotor):
self._run_subs(sub_type=self._SUB_REQ_DONE, success=False) self._run_subs(sub_type=self._SUB_REQ_DONE, success=False)
self._reset_sub(self._SUB_REQ_DONE) self._reset_sub(self._SUB_REQ_DONE)
status = MoveStatus( status = MoveStatus(self, position, timeout=timeout, settle_time=self._settle_time)
self, position, timeout=timeout, settle_time=self._settle_time
)
if moved_cb is not None: if moved_cb is not None:
status.add_callback(functools.partial(moved_cb, obj=self)) status.add_callback(functools.partial(moved_cb, obj=self))
@@ -210,9 +197,9 @@ class EpicsMotor(OphydEpicsMotor):
status.set_finished() status.set_finished()
else: else:
if exception is None: if exception is None:
exception = UnknownStatusFailure( exception = UnknownStatusFailure(f"{self.name} failed to move to {position}")
f"{self.name} failed to move to {position}")
status.set_exception(exception) status.set_exception(exception)
self.subscribe(set_status, event_type=self._SUB_REQ_DONE, run=False) self.subscribe(set_status, event_type=self._SUB_REQ_DONE, run=False)
self.user_setpoint.put(position, wait=False) self.user_setpoint.put(position, wait=False)
@@ -224,6 +211,7 @@ class EpicsMotor(OphydEpicsMotor):
raise raise
return status return status
class EpicsMotorEC(EpicsMotor): class EpicsMotorEC(EpicsMotor):
"""Detailed ECMC EPICS motor class """Detailed ECMC EPICS motor class
@@ -276,9 +264,7 @@ class EpicsMotorEC(EpicsMotor):
if error: if error:
success = False success = False
message_text = self.message_text.get(use_monitor=False) message_text = self.message_text.get(use_monitor=False)
exception = RuntimeError( exception = RuntimeError(f"Motor {self.name} reported ECMC error '{message_text}'")
f"Motor {self.name} reported ECMC error '{message_text}'"
)
return success, exception return success, exception
def move(self, position, wait=False, **kwargs) -> MoveStatus: def move(self, position, wait=False, **kwargs) -> MoveStatus:
@@ -316,11 +302,9 @@ class EpicsMotorEC(EpicsMotor):
might need to push physical buttons. might need to push physical buttons.
""" """
# Reset the error # Reset the error
self.error_reset.set(1, settle_time=0.2).wait() self.error_reset.set(1, settle_time=0.2).wait(timeout=5) # block for 5s
# Check if it disappeared # Check if it disappeared
error = self.error.get(auto_monitor=False) error = self.error.get(auto_monitor=False)
if error: if error:
message_text = self.message_text.get(auto_monitor=False) message_text = self.message_text.get(auto_monitor=False)
raise RuntimeError( raise RuntimeError(f"Failed to reset axis, still in error '{message_text}'")
f"Failed to reset axis, still in error '{message_text}'"
)

View File

@@ -83,7 +83,7 @@ def test_epics_motor_move(mock_epics_motor):
assert motor.user_readback.get() == 5 assert motor.user_readback.get() == 5
assert motor.user_setpoint.get() == 5 assert motor.user_setpoint.get() == 5
# Now this should raise... in theory... # Now this should raise...
motor.user_readback._read_pv.set_severity(ophyd.utils.epics_pvs.AlarmSeverity.MAJOR) motor.user_readback._read_pv.set_severity(ophyd.utils.epics_pvs.AlarmSeverity.MAJOR)
motor.user_readback._metadata["severity"] = ophyd.utils.epics_pvs.AlarmSeverity.MAJOR motor.user_readback._metadata["severity"] = ophyd.utils.epics_pvs.AlarmSeverity.MAJOR
motor.user_readback._read_pv.set_alarm_status(ophyd.utils.epics_pvs.AlarmStatus.READ) motor.user_readback._read_pv.set_alarm_status(ophyd.utils.epics_pvs.AlarmStatus.READ)
@@ -98,12 +98,10 @@ def test_epics_motor_move(mock_epics_motor):
assert status.done is False assert status.done is False
motor.user_readback._read_pv.mock_data = 10 motor.user_readback._read_pv.mock_data = 10
motor.motor_done_move._read_pv.mock_data = 1 motor.motor_done_move._read_pv.mock_data = 1
status.wait(timeout=5) with pytest.raises(RuntimeError):
assert status.done is True status.wait(timeout=5)
assert status.done is True
# But it only raises if we do the following assert isinstance(status.exception(), RuntimeError)
motor.tolerated_alarm = ophyd.utils.epics_pvs.AlarmSeverity.MINOR
status = motor.move(6)
def test_move_epics_motor_ec(mock_epics_motor_ec): def test_move_epics_motor_ec(mock_epics_motor_ec):
@@ -130,7 +128,7 @@ def test_move_epics_motor_ec(mock_epics_motor_ec):
status.wait(timeout=5) status.wait(timeout=5)
assert status.done is True assert status.done is True
# If error exists, this should log a warning, and PLC logic should try to remove it itself # If error exists, this should raise a warning
motor_ec.error._read_pv.mock_data = 1 motor_ec.error._read_pv.mock_data = 1
status = motor_ec.move(8) status = motor_ec.move(8)
motor_ec.motor_done_move._read_pv.mock_data = 0 motor_ec.motor_done_move._read_pv.mock_data = 0
@@ -138,11 +136,12 @@ def test_move_epics_motor_ec(mock_epics_motor_ec):
assert mock_log_warning.call_count == 1 assert mock_log_warning.call_count == 1
motor_ec.user_readback._read_pv.mock_data = 8 motor_ec.user_readback._read_pv.mock_data = 8
motor_ec.motor_done_move._read_pv.mock_data = 1 motor_ec.motor_done_move._read_pv.mock_data = 1
status.wait(timeout=5) with pytest.raises(RuntimeError):
assert status.done is True status.wait(timeout=5)
motor_ec.error._read_pv.mock_data = 0 assert status.done is True
assert isinstance(status.exception(), RuntimeError)
# Attempting to move above active HLS should raise a warning motor_ec.error._read_pv.mock_data = 0
# Note: Position has to be > current position from 8 to 9... # Note: Position has to be > current position from 8 to 9...
motor_ec.high_interlock._read_pv.mock_data = 1 motor_ec.high_interlock._read_pv.mock_data = 1
status = motor_ec.move(9) status = motor_ec.move(9)