From 9eb1dea8eaf390ba317c245b4de4379593e7fff0 Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 17 Jul 2025 15:12:39 +0200 Subject: [PATCH] fix(undulator): add check for operator control for stop_signal --- ophyd_devices/devices/undulator.py | 64 ++++++++++++++++++++++++++++-- tests/test_undulator.py | 10 ++++- 2 files changed, 69 insertions(+), 5 deletions(-) diff --git a/ophyd_devices/devices/undulator.py b/ophyd_devices/devices/undulator.py index 9da81db..1df9a5f 100644 --- a/ophyd_devices/devices/undulator.py +++ b/ophyd_devices/devices/undulator.py @@ -2,20 +2,72 @@ Module for undulator control """ +from __future__ import annotations + +import enum + +from bec_lib.logger import bec_logger from ophyd import EpicsSignal, EpicsSignalRO, PVPositioner from ophyd.device import Component as Cpt +from ophyd.signal import DEFAULT_CONNECTION_TIMEOUT, DEFAULT_WRITE_TIMEOUT from ophyd.status import MoveStatus +logger = bec_logger.logger + + +class UNDULATORCONTROL(int, enum.Enum): + """ + Enum for undulator control modes. + """ + + OPERATOR = 0 + BEAMLINE = 1 + + +class UndulatorEpicsSignal(EpicsSignal): + """ + SLS Undulator setpoint control + """ + + def put( + self, + value, + force=False, + connection_timeout=DEFAULT_CONNECTION_TIMEOUT, + callback=None, + use_complete=None, + timeout=DEFAULT_WRITE_TIMEOUT, + **kwargs, + ): + """ + Put a value to the setpoint PV. + + If the undulator is operator controlled, it will not move. + """ + if self.parent.select_control.get() == UNDULATORCONTROL.OPERATOR.value: + raise PermissionError( + f"Cannot use put for signal {self.name}; Undulator is operator controlled!" + ) + return super().put( + value, + force=force, + connection_timeout=connection_timeout, + callback=callback, + use_complete=use_complete, + timeout=timeout, + **kwargs, + ) + class UndulatorGap(PVPositioner): """ SLS Undulator gap control """ - setpoint = Cpt(EpicsSignal, suffix="GAP-SP") + setpoint = Cpt(UndulatorEpicsSignal, suffix="GAP-SP") readback = Cpt(EpicsSignal, suffix="GAP-RBV", kind="hinted", auto_monitor=True) - stop_signal = Cpt(EpicsSignal, suffix="STOP") + stop_signal = Cpt(UndulatorEpicsSignal, suffix="STOP") done = Cpt(EpicsSignalRO, suffix="DONE", auto_monitor=True) select_control = Cpt(EpicsSignalRO, suffix="SCTRL", auto_monitor=True) @@ -45,17 +97,21 @@ class UndulatorGap(PVPositioner): # Make the default alias for the user_readback the name of the # motor itself. self.readback.name = self.name + self.readback._metadata["write_access"] = False def move(self, position, wait=True, timeout=None, moved_cb=None): # If it is operator controlled, undulator will not move. if self.select_control.get() == 0: - raise Exception("Undulator is operator controlled!") + raise PermissionError("Undulator is operator controlled!") # If it is already there, undulator will not move. The done flag # will not change, the moving change callback will not be called. # The status will not change. - if abs(position - self._position) < 0.0008: + if self._position is not None and abs(position - self._position) < 0.0008: + logger.info( + f"Undulator gap {self.name} already close to position {position}, not moving." + ) status = MoveStatus(self, position, done=True, success=True) return status diff --git a/tests/test_undulator.py b/tests/test_undulator.py index cf346e2..967ae19 100644 --- a/tests/test_undulator.py +++ b/tests/test_undulator.py @@ -49,6 +49,14 @@ def test_instant_completion_within_deadband( def test_undulator_raises_when_disabled(mock_undulator): mock_undulator.select_control._read_pv.mock_data = 0 - with pytest.raises(Exception) as e: + with pytest.raises(PermissionError) as e: mock_undulator.move(5) assert e.match("Undulator is operator controlled!") + + +def test_undulator_stop_call(mock_undulator): + mock_undulator.select_control._read_pv.mock_data = 1 + mock_undulator.stop() + mock_undulator.select_control._read_pv.mock_data = 0 + with pytest.raises(PermissionError) as e: + mock_undulator.stop()