fix(psi-motor): Add new user motor imlementation that checks if the IOC is enabled.

This commit is contained in:
2025-12-02 11:11:38 +01:00
committed by Christian Appel
parent fa51ffe33a
commit 71a9b3c103
4 changed files with 74 additions and 66 deletions

View File

@@ -1,7 +1,6 @@
from ophyd.quadem import QuadEM from ophyd.quadem import QuadEM
from ophyd.sim import SynAxis, SynPeriodicSignal, SynSignal from ophyd.sim import SynAxis, SynPeriodicSignal, SynSignal
from .epics_motor_ex import EpicsMotorEx
from .psi_motor import EpicsMotor, EpicsMotorEC from .psi_motor import EpicsMotor, EpicsMotorEC
from .sls_devices import SLSInfo, SLSOperatorMessages from .sls_devices import SLSInfo, SLSOperatorMessages
from .SpmBase import SpmBase from .SpmBase import SpmBase

View File

@@ -1,66 +1,18 @@
from ophyd import Component as Cpt # """Module extending EpicsMotor for VME based User motors with extra configuration fields."""
from ophyd import EpicsMotor, EpicsSignal
# from ophyd import Component as Cpt
# from ophyd import EpicsSignal
# # from ophyd_devices.devices.psi_motor import EpicsUserMotors
class EpicsMotorEx(EpicsMotor): # class EpicsMotorEx(EpicsUserMotors):
"""Extend EpicsMotor with extra configuration fields. # """
motor_done_move # EpicsMotor that extends a VME based user motor with additional configuration signals,
motor_is_moving # motor_resolution, base_velocity and backlash_distance.
""" # """
# configuration # # configuration
motor_resolution = Cpt(EpicsSignal, ".MRES", kind="config", auto_monitor=True) # motor_resolution = Cpt(EpicsSignal, ".MRES", kind="config", auto_monitor=True)
base_velocity = Cpt(EpicsSignal, ".VBAS", kind="config", auto_monitor=True) # base_velocity = Cpt(EpicsSignal, ".VBAS", kind="config", auto_monitor=True)
backlash_distance = Cpt(EpicsSignal, ".BDST", kind="config", auto_monitor=True) # backlash_distance = Cpt(EpicsSignal, ".BDST", kind="config", auto_monitor=True)
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
**kwargs,
):
# get configuration attributes from kwargs and then remove them
attrs = {}
for key, value in kwargs.items():
if hasattr(EpicsMotorEx, key) and isinstance(getattr(EpicsMotorEx, key), Cpt):
attrs[key] = value
for key in attrs:
kwargs.pop(key)
super().__init__(
prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
# set configuration attributes
for key, value in attrs.items():
# print out attributes that are being configured
print("setting ", key, "=", value)
getattr(self, key).put(value)
# self.motor_done_move.subscribe(self._progress_update, run=False)
# def kickoff(self) -> DeviceStatus:
# status = DeviceStatus(self)
# self.move(
# self._kickoff_params.get("position"),
# wait = False
# )
# return status
# def _progress_update(self, value, **kwargs) -> None:
# self._run_subs(
# sub_type=self.SUB_PROGRESS,
# value=value ,
# done= 1,
# )

View File

@@ -14,6 +14,8 @@ from ophyd.status import MoveStatus
from ophyd.utils.epics_pvs import AlarmSeverity, fmt_time from ophyd.utils.epics_pvs import AlarmSeverity, fmt_time
from ophyd.utils.errors import UnknownStatusFailure from ophyd.utils.errors import UnknownStatusFailure
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
class SpmgStates: class SpmgStates:
"""Enum for the EPICS MotorRecord's SPMG state""" """Enum for the EPICS MotorRecord's SPMG state"""
@@ -212,6 +214,45 @@ class EpicsMotor(OphydEpicsMotor):
return status return status
class EpicsUserMotorVME(PSIDeviceBase, EpicsMotor):
"""
EpicsMotor for VME based user motors. It includes additional checks for DISA and DISP.
"""
motor_resolution = Cpt(EpicsSignal, ".MRES", kind="config", auto_monitor=True)
base_velocity = Cpt(EpicsSignal, ".VBAS", kind="config", auto_monitor=True)
backlash_distance = Cpt(EpicsSignal, ".BDST", kind="config", auto_monitor=True)
_ioc_enable = Cpt(EpicsSignal, "_able", kind=Kind.omitted, string=True, auto_monitor=True)
def wait_for_connection(self, all_signals=False, timeout: float | None = None) -> None:
"""
Wait for connection with an additional check first if the IOC is enabled.
"""
if self._ioc_enable.get(use_monitor=False) != "Enable":
self._ioc_enable.put("Enable")
hl_switch = self.high_limit_switch.get(use_monitor=False)
ll_switch = self.low_limit_switch.get(use_monitor=False)
if hl_switch == 1 and ll_switch == 1:
raise RuntimeError(
f"Both limit switches are active for device {self.name}."
f"This often indicates that the motor is disconnected! Please double-check your hardware!"
)
return super().wait_for_connection(all_signals, timeout)
def on_connected(self):
self._ioc_enable.subscribe(self._ioc_enable_changed, run=False)
def _ioc_enable_changed(self, value, **kwargs):
"""Callback for IOC enable signal changes"""
if self.device_manager is None:
return # no device manager assigned
if self.name not in self.device_manager.devices:
return # device not loaded in device_manager
self.device_manager.devices[self.name].enabled = value == "Enable"
class EpicsMotorEC(EpicsMotor): class EpicsMotorEC(EpicsMotor):
"""Detailed ECMC EPICS motor class """Detailed ECMC EPICS motor class

View File

@@ -3,13 +3,15 @@ PSI motor integration from the ophyd_devices.devices.psi_motor module."""
from __future__ import annotations from __future__ import annotations
import threading
from unittest import mock from unittest import mock
import ophyd import ophyd
import pytest import pytest
from ophyd_devices.devices.psi_motor import EpicsMotor, EpicsMotorEC, SpmgStates from ophyd_devices.devices.epics_motor_ex import EpicsMotorEx
from ophyd_devices.tests.utils import patched_device from ophyd_devices.devices.psi_motor import EpicsMotor, EpicsMotorEC, EpicsUserMotors, SpmgStates
from ophyd_devices.tests.utils import MockPV, patched_device
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
@@ -160,3 +162,17 @@ def test_epics_motor_high_limit_switch_raises(mock_epics_motor):
motor.high_limit_switch._read_pv.mock_data = 1 # Simulate high limit switch active motor.high_limit_switch._read_pv.mock_data = 1 # Simulate high limit switch active
with pytest.raises(ophyd.utils.LimitError): with pytest.raises(ophyd.utils.LimitError):
motor.move(15) motor.move(15)
def test_epics_vme_user_motor():
"""Test extended VME based user motor implementation EpicsUserMotors."""
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
device = EpicsUserMotors(name="test_motor_ex", prefix="SIM:MOTOR:EX")
# Should raise
with pytest.raises(RuntimeError):
device.wait_for_connection(all_signals=True)
# Should not raise
device._ioc_enable._read_pv.mock_data = "Enable"
device.wait_for_connection(all_signals=True)