fix(psi-motor): Fix wrapper for EpicsISignalWithCheck

This commit is contained in:
2025-12-16 10:31:21 +01:00
parent 288de5b95b
commit 5dc3d09cc2

View File

@@ -6,6 +6,7 @@ detailed interface for motors using the new ECMC-based motion systems at PSI.
""" """
import functools import functools
import time
import numpy as np import numpy as np
from ophyd import Component as Cpt from ophyd import Component as Cpt
@@ -15,6 +16,7 @@ from ophyd.status import MoveStatus
from ophyd.utils.epics_pvs import AlarmSeverity, fmt_time from ophyd.utils.epics_pvs import AlarmSeverity, fmt_time
from ophyd.utils.errors import UnknownStatusFailure from ophyd.utils.errors import UnknownStatusFailure
from ophyd_devices import StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
@@ -50,6 +52,49 @@ class EpicsSignalWithCheck(EpicsSignal):
if not np.isclose(value, new_value): if not np.isclose(value, new_value):
raise ValueError(f"Failed to set signal {self.name} to value: {value}.") raise ValueError(f"Failed to set signal {self.name} to value: {value}.")
def set(self, value, *, timeout=None, settle_time=None, **kwargs):
"""Set the value of the Signal and return a Status object.
Returns
-------
st : Status
This status object will be finished upon return in the
case of basic soft Signals
"""
def set_thread():
try:
self._set_and_wait(value, timeout, **kwargs)
except TimeoutError as e:
raised_exception = e
except Exception as e:
raised_exception = e
else:
raised_exception = None
if settle_time is not None:
self.log.debug("settling for %d seconds", settle_time)
time.sleep(settle_time)
finally:
# keep a local reference to avoid any GC shenanigans
th = self._set_thread
# these two must be in this order to avoid a race condition
self._set_thread = None
if raised_exception is not None:
st.set_exception(raised_exception)
else:
st.set_finished()
del th
if self._set_thread is not None:
raise RuntimeError("Another set() call is still in progress " f"for {self.name}")
st = StatusBase(self)
self._status = st
self._set_thread = self.cl.thread_class(target=set_thread)
self._set_thread.daemon = True
self._set_thread.start()
return self._status
class EpicsMotor(OphydEpicsMotor): class EpicsMotor(OphydEpicsMotor):
""" """