diff --git a/ophyd_devices/devices/__init__.py b/ophyd_devices/devices/__init__.py index 4926733..08b8e73 100644 --- a/ophyd_devices/devices/__init__.py +++ b/ophyd_devices/devices/__init__.py @@ -1,7 +1,6 @@ from ophyd.quadem import QuadEM from ophyd.sim import SynAxis, SynPeriodicSignal, SynSignal -from .epics_motor_ex import EpicsMotorEx from .psi_motor import EpicsMotor, EpicsMotorEC from .sls_devices import SLSInfo, SLSOperatorMessages from .SpmBase import SpmBase diff --git a/ophyd_devices/devices/epics_motor_ex.py b/ophyd_devices/devices/epics_motor_ex.py index 16523ad..2cc3aa9 100644 --- a/ophyd_devices/devices/epics_motor_ex.py +++ b/ophyd_devices/devices/epics_motor_ex.py @@ -1,66 +1,18 @@ -from ophyd import Component as Cpt -from ophyd import EpicsMotor, EpicsSignal +# """Module extending EpicsMotor for VME based User motors with extra configuration fields.""" + +# from ophyd import Component as Cpt +# from ophyd import EpicsSignal + +# # from ophyd_devices.devices.psi_motor import EpicsUserMotors -class EpicsMotorEx(EpicsMotor): - """Extend EpicsMotor with extra configuration fields. - motor_done_move - motor_is_moving - """ +# class EpicsMotorEx(EpicsUserMotors): +# """ +# EpicsMotor that extends a VME based user motor with additional configuration signals, +# motor_resolution, base_velocity and backlash_distance. +# """ - # configuration - motor_resolution = Cpt(EpicsSignal, ".MRES", kind="config", auto_monitor=True) - base_velocity = Cpt(EpicsSignal, ".VBAS", kind="config", auto_monitor=True) - backlash_distance = Cpt(EpicsSignal, ".BDST", kind="config", auto_monitor=True) - - def __init__( - self, - prefix="", - *, - name, - kind=None, - read_attrs=None, - configuration_attrs=None, - parent=None, - **kwargs, - ): - # get configuration attributes from kwargs and then remove them - attrs = {} - for key, value in kwargs.items(): - if hasattr(EpicsMotorEx, key) and isinstance(getattr(EpicsMotorEx, key), Cpt): - attrs[key] = value - for key in attrs: - kwargs.pop(key) - - super().__init__( - prefix, - name=name, - kind=kind, - read_attrs=read_attrs, - configuration_attrs=configuration_attrs, - parent=parent, - **kwargs, - ) - - # set configuration attributes - for key, value in attrs.items(): - # print out attributes that are being configured - print("setting ", key, "=", value) - getattr(self, key).put(value) - - # self.motor_done_move.subscribe(self._progress_update, run=False) - - # def kickoff(self) -> DeviceStatus: - # status = DeviceStatus(self) - # self.move( - # self._kickoff_params.get("position"), - # wait = False - # ) - # return status - - # def _progress_update(self, value, **kwargs) -> None: - # self._run_subs( - # sub_type=self.SUB_PROGRESS, - # value=value , - # done= 1, - # ) +# # configuration +# motor_resolution = Cpt(EpicsSignal, ".MRES", kind="config", auto_monitor=True) +# base_velocity = Cpt(EpicsSignal, ".VBAS", kind="config", auto_monitor=True) +# backlash_distance = Cpt(EpicsSignal, ".BDST", kind="config", auto_monitor=True) diff --git a/ophyd_devices/devices/psi_motor.py b/ophyd_devices/devices/psi_motor.py index 29b792b..f641786 100644 --- a/ophyd_devices/devices/psi_motor.py +++ b/ophyd_devices/devices/psi_motor.py @@ -14,6 +14,8 @@ from ophyd.status import MoveStatus from ophyd.utils.epics_pvs import AlarmSeverity, fmt_time from ophyd.utils.errors import UnknownStatusFailure +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase + class SpmgStates: """Enum for the EPICS MotorRecord's SPMG state""" @@ -212,6 +214,45 @@ class EpicsMotor(OphydEpicsMotor): return status +class EpicsUserMotorVME(PSIDeviceBase, EpicsMotor): + """ + EpicsMotor for VME based user motors. It includes additional checks for DISA and DISP. + + """ + + motor_resolution = Cpt(EpicsSignal, ".MRES", kind="config", auto_monitor=True) + base_velocity = Cpt(EpicsSignal, ".VBAS", kind="config", auto_monitor=True) + backlash_distance = Cpt(EpicsSignal, ".BDST", kind="config", auto_monitor=True) + + _ioc_enable = Cpt(EpicsSignal, "_able", kind=Kind.omitted, string=True, auto_monitor=True) + + def wait_for_connection(self, all_signals=False, timeout: float | None = None) -> None: + """ + Wait for connection with an additional check first if the IOC is enabled. + """ + if self._ioc_enable.get(use_monitor=False) != "Enable": + self._ioc_enable.put("Enable") + hl_switch = self.high_limit_switch.get(use_monitor=False) + ll_switch = self.low_limit_switch.get(use_monitor=False) + if hl_switch == 1 and ll_switch == 1: + raise RuntimeError( + f"Both limit switches are active for device {self.name}." + f"This often indicates that the motor is disconnected! Please double-check your hardware!" + ) + return super().wait_for_connection(all_signals, timeout) + + def on_connected(self): + self._ioc_enable.subscribe(self._ioc_enable_changed, run=False) + + def _ioc_enable_changed(self, value, **kwargs): + """Callback for IOC enable signal changes""" + if self.device_manager is None: + return # no device manager assigned + if self.name not in self.device_manager.devices: + return # device not loaded in device_manager + self.device_manager.devices[self.name].enabled = value == "Enable" + + class EpicsMotorEC(EpicsMotor): """Detailed ECMC EPICS motor class diff --git a/tests/test_psi_motors.py b/tests/test_psi_motors.py index 6dde638..1a0a50c 100644 --- a/tests/test_psi_motors.py +++ b/tests/test_psi_motors.py @@ -3,13 +3,15 @@ PSI motor integration from the ophyd_devices.devices.psi_motor module.""" from __future__ import annotations +import threading from unittest import mock import ophyd import pytest -from ophyd_devices.devices.psi_motor import EpicsMotor, EpicsMotorEC, SpmgStates -from ophyd_devices.tests.utils import patched_device +from ophyd_devices.devices.epics_motor_ex import EpicsMotorEx +from ophyd_devices.devices.psi_motor import EpicsMotor, EpicsMotorEC, EpicsUserMotors, SpmgStates +from ophyd_devices.tests.utils import MockPV, patched_device @pytest.fixture(scope="function") @@ -160,3 +162,17 @@ def test_epics_motor_high_limit_switch_raises(mock_epics_motor): motor.high_limit_switch._read_pv.mock_data = 1 # Simulate high limit switch active with pytest.raises(ophyd.utils.LimitError): motor.move(15) + + +def test_epics_vme_user_motor(): + """Test extended VME based user motor implementation EpicsUserMotors.""" + with mock.patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + device = EpicsUserMotors(name="test_motor_ex", prefix="SIM:MOTOR:EX") + # Should raise + with pytest.raises(RuntimeError): + device.wait_for_connection(all_signals=True) + # Should not raise + device._ioc_enable._read_pv.mock_data = "Enable" + device.wait_for_connection(all_signals=True)