From 3d0a2119db61d01bfb05687f883b7574c7f1c6fb Mon Sep 17 00:00:00 2001 From: Xiaoqiang Wang Date: Fri, 24 Oct 2025 13:38:01 +0200 Subject: [PATCH] raise exception on failed motion --- ophyd_devices/devices/psi_motor.py | 157 ++++++++++++++++++++++++++--- 1 file changed, 145 insertions(+), 12 deletions(-) diff --git a/ophyd_devices/devices/psi_motor.py b/ophyd_devices/devices/psi_motor.py index 302e815..a33f855 100644 --- a/ophyd_devices/devices/psi_motor.py +++ b/ophyd_devices/devices/psi_motor.py @@ -4,12 +4,14 @@ This module extends the basic EpicsMotor with additional functionality. It exposes additional parameters of the EPICS MotorRecord and provides a more detailed interface for motors using the new ECMC-based motion systems at PSI. """ +import functools from ophyd import Component as Cpt from ophyd import EpicsMotor as OphydEpicsMotor from ophyd import EpicsSignal, EpicsSignalRO, Kind from ophyd.status import MoveStatus -from ophyd.utils.epics_pvs import AlarmSeverity +from ophyd.utils.epics_pvs import AlarmSeverity, fmt_time +from ophyd.utils.errors import UnknownStatusFailure class SpmgStates: @@ -81,7 +83,88 @@ class EpicsMotor(OphydEpicsMotor): high_limit_travel = Cpt(EpicsSignalWithCheck, ".HLM", kind=Kind.omitted, auto_monitor=True) low_limit_travel = Cpt(EpicsSignalWithCheck, ".LLM", kind=Kind.omitted, auto_monitor=True) - def move(self, position, wait=False, **kwargs) -> MoveStatus: + def _check_motion_status(self) -> tuple[bool, Exception | None]: + """Check if the motion finished successfully""" + success = True + exc = None + # Check if we are at limit switches + if self.low_limit_switch.get(use_monitor=False) == 1: + success = False + exc = RuntimeError( + f"Motor {self.name} hit the low limit switch during motion") + if self.high_limit_switch.get(use_monitor=False) == 1: + success = False + exc = RuntimeError( + f"Motor {self.name} hit the high limit switch during motion") + + # Check the severity of the alarm field after motion is complete. + # If there is any alarm at all warn the user, and if the alarm is + # greater than what is tolerated, mark the move as unsuccessful + severity = self.user_readback.alarm_severity + + if severity != AlarmSeverity.NO_ALARM: + status = self.user_readback.alarm_status + if severity > self.tolerated_alarm: + self.log.error( + "Motion failed: %s is in an alarm state " + "status=%s severity=%s", + self.name, + status, + severity, + ) + success = False + exc = RuntimeError( + f"Motor {self.name} is in an alarm state " + f"status={status} severity={severity}" + ) + else: + self.log.warning( + "Motor %s raised an alarm during motion " + "status=%s severity %s", + self.name, + status, + severity, + ) + + return success, exc + + def _move_changed(self, timestamp=None, value=None, sub_type=None, **kwargs): + """Callback from EPICS, indicating that movement status has changed""" + was_moving = self._moving + self._moving = value != 1 + + started = False + if not self._started_moving: + started = self._started_moving = not was_moving and self._moving + + self.log.debug( + "[ts=%s] %s moving: %s (value=%s)", + fmt_time(timestamp), + self, + self._moving, + value, + ) + + if started: + self._run_subs( + sub_type=self.SUB_START, timestamp=timestamp, value=value, **kwargs + ) + + if was_moving and not self._moving: + success, exc = self._check_motion_status() + self._done_moving(success=success, timestamp=timestamp, value=value, exception=exc) + + def _done_moving(self, success=True, timestamp=None, value=None, **kwargs): + """Overload PositionerBase._done_moving to pass kwargs to _SUB_REQ_DONE callbacks.""" + if success: + self._run_subs(sub_type=self.SUB_DONE, timestamp=timestamp, value=value) + + self._run_subs( + sub_type=self._SUB_REQ_DONE, success=success, timestamp=timestamp, **kwargs + ) + self._reset_sub(self._SUB_REQ_DONE) + + def move(self, position, wait=False, moved_cb=None, timeout=None, **kwargs) -> MoveStatus: """Extended move function with a few sanity checks Note that the default EpicsMotor only supports the 'GO' movement mode. @@ -102,8 +185,44 @@ class EpicsMotor(OphydEpicsMotor): if self.low_limit_switch.get(use_monitor=False) and position < self.position: self.log.warning("Attempting to move below active LLS") - return super().move(position, wait, **kwargs) + self._started_moving = False + # Modified PositionerBase.move method to call set_finished/set_exception instead of + # deprecated _finished on the status object. + if timeout is None: + timeout = self._timeout + + self.check_value(position) + + self._run_subs(sub_type=self._SUB_REQ_DONE, success=False) + self._reset_sub(self._SUB_REQ_DONE) + + status = MoveStatus( + self, position, timeout=timeout, settle_time=self._settle_time + ) + + if moved_cb is not None: + status.add_callback(functools.partial(moved_cb, obj=self)) + # the status object will run this callback when finished + + def set_status(success, exception=None, **kwargs): + if success: + status.set_finished() + else: + if exception is None: + exception = UnknownStatusFailure( + f"{self.name} failed to move to {position}") + status.set_exception(exception) + self.subscribe(set_status, event_type=self._SUB_REQ_DONE, run=False) + + self.user_setpoint.put(position, wait=False) + try: + if wait: + status.wait(timeout) + except KeyboardInterrupt: + self.stop() + raise + return status class EpicsMotorEC(EpicsMotor): """Detailed ECMC EPICS motor class @@ -114,9 +233,10 @@ class EpicsMotorEC(EpicsMotor): """ USER_ACCESS = ["reset"] - motor_enable_readback = Cpt( + motor_enabled = Cpt( EpicsSignalRO, "-EnaAct", + auto_monitor=True, kind=Kind.omitted, doc="[ECMC] Reflects whether the axis is enabled in the hardware level.", ) @@ -143,10 +263,23 @@ class EpicsMotorEC(EpicsMotor): error = Cpt( EpicsSignalRO, "-ErrId", auto_monitor=True, kind=Kind.omitted, doc="[ECMC] Error ID" ) - error_msg = Cpt( - EpicsSignalRO, "-MsgTxt", auto_monitor=True, kind=Kind.omitted, doc="[ECMC] Error message" - ) error_reset = Cpt(EpicsSignal, "-ErrRst", put_complete=True, kind=Kind.omitted) + message_text = Cpt( + EpicsSignalRO, "-MsgTxt", auto_monitor=True, kind=Kind.omitted, doc="[ECMC] Message text" + ) + + def _check_motion_status(self) -> tuple[bool, Exception | None]: + success, exception = super()._check_motion_status() + if success: + # Additionally check for ECMC errors + error = self.error.get(use_monitor=False) + if error: + success = False + message_text = self.message_text.get(use_monitor=False) + exception = RuntimeError( + f"Motor {self.name} reported ECMC error '{message_text}'" + ) + return success, exception def move(self, position, wait=False, **kwargs) -> MoveStatus: """Extended move function with a few sanity checks @@ -157,7 +290,7 @@ class EpicsMotorEC(EpicsMotor): # Check ECMC error status before move error = self.error.get() if error: - self.log.warning(f"Motor is in error state with message: '{self.error_msg.get()}'") + self.log.warning(f"Motor is in error state with message: '{self.message_text.get()}'") # Warn if trying to move beyond an active limit if self.high_interlock.get(use_monitor=False) and position > self.position: @@ -167,8 +300,8 @@ class EpicsMotorEC(EpicsMotor): return super().move(position, wait, **kwargs) - def reset(self): - """Resets an ECMC axis + def reset_error(self): + """Resets an ECMC axis error. Recovers an ECMC axis from a previous error. Note that this does not solve the cause of the error, that you'll have to do yourself. @@ -187,7 +320,7 @@ class EpicsMotorEC(EpicsMotor): # Check if it disappeared error = self.error.get(auto_monitor=False) if error: - error_msg = self.error_msg.get(auto_monitor=False) + message_text = self.message_text.get(auto_monitor=False) raise RuntimeError( - f"Failed to reset axis, still in error with id: '{error}, msg: {error_msg}'" + f"Failed to reset axis, still in error '{message_text}'" )