mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2026-02-07 07:38:40 +01:00
fix(undulator): add check for operator control for stop_signal
This commit is contained in:
@@ -2,20 +2,72 @@
|
|||||||
Module for undulator control
|
Module for undulator control
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import enum
|
||||||
|
|
||||||
|
from bec_lib.logger import bec_logger
|
||||||
from ophyd import EpicsSignal, EpicsSignalRO, PVPositioner
|
from ophyd import EpicsSignal, EpicsSignalRO, PVPositioner
|
||||||
from ophyd.device import Component as Cpt
|
from ophyd.device import Component as Cpt
|
||||||
|
from ophyd.signal import DEFAULT_CONNECTION_TIMEOUT, DEFAULT_WRITE_TIMEOUT
|
||||||
from ophyd.status import MoveStatus
|
from ophyd.status import MoveStatus
|
||||||
|
|
||||||
|
logger = bec_logger.logger
|
||||||
|
|
||||||
|
|
||||||
|
class UNDULATORCONTROL(int, enum.Enum):
|
||||||
|
"""
|
||||||
|
Enum for undulator control modes.
|
||||||
|
"""
|
||||||
|
|
||||||
|
OPERATOR = 0
|
||||||
|
BEAMLINE = 1
|
||||||
|
|
||||||
|
|
||||||
|
class UndulatorEpicsSignal(EpicsSignal):
|
||||||
|
"""
|
||||||
|
SLS Undulator setpoint control
|
||||||
|
"""
|
||||||
|
|
||||||
|
def put(
|
||||||
|
self,
|
||||||
|
value,
|
||||||
|
force=False,
|
||||||
|
connection_timeout=DEFAULT_CONNECTION_TIMEOUT,
|
||||||
|
callback=None,
|
||||||
|
use_complete=None,
|
||||||
|
timeout=DEFAULT_WRITE_TIMEOUT,
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Put a value to the setpoint PV.
|
||||||
|
|
||||||
|
If the undulator is operator controlled, it will not move.
|
||||||
|
"""
|
||||||
|
if self.parent.select_control.get() == UNDULATORCONTROL.OPERATOR.value:
|
||||||
|
raise PermissionError(
|
||||||
|
f"Cannot use put for signal {self.name}; Undulator is operator controlled!"
|
||||||
|
)
|
||||||
|
return super().put(
|
||||||
|
value,
|
||||||
|
force=force,
|
||||||
|
connection_timeout=connection_timeout,
|
||||||
|
callback=callback,
|
||||||
|
use_complete=use_complete,
|
||||||
|
timeout=timeout,
|
||||||
|
**kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class UndulatorGap(PVPositioner):
|
class UndulatorGap(PVPositioner):
|
||||||
"""
|
"""
|
||||||
SLS Undulator gap control
|
SLS Undulator gap control
|
||||||
"""
|
"""
|
||||||
|
|
||||||
setpoint = Cpt(EpicsSignal, suffix="GAP-SP")
|
setpoint = Cpt(UndulatorEpicsSignal, suffix="GAP-SP")
|
||||||
readback = Cpt(EpicsSignal, suffix="GAP-RBV", kind="hinted", auto_monitor=True)
|
readback = Cpt(EpicsSignal, suffix="GAP-RBV", kind="hinted", auto_monitor=True)
|
||||||
|
|
||||||
stop_signal = Cpt(EpicsSignal, suffix="STOP")
|
stop_signal = Cpt(UndulatorEpicsSignal, suffix="STOP")
|
||||||
done = Cpt(EpicsSignalRO, suffix="DONE", auto_monitor=True)
|
done = Cpt(EpicsSignalRO, suffix="DONE", auto_monitor=True)
|
||||||
|
|
||||||
select_control = Cpt(EpicsSignalRO, suffix="SCTRL", auto_monitor=True)
|
select_control = Cpt(EpicsSignalRO, suffix="SCTRL", auto_monitor=True)
|
||||||
@@ -45,17 +97,21 @@ class UndulatorGap(PVPositioner):
|
|||||||
# Make the default alias for the user_readback the name of the
|
# Make the default alias for the user_readback the name of the
|
||||||
# motor itself.
|
# motor itself.
|
||||||
self.readback.name = self.name
|
self.readback.name = self.name
|
||||||
|
self.readback._metadata["write_access"] = False
|
||||||
|
|
||||||
def move(self, position, wait=True, timeout=None, moved_cb=None):
|
def move(self, position, wait=True, timeout=None, moved_cb=None):
|
||||||
|
|
||||||
# If it is operator controlled, undulator will not move.
|
# If it is operator controlled, undulator will not move.
|
||||||
if self.select_control.get() == 0:
|
if self.select_control.get() == 0:
|
||||||
raise Exception("Undulator is operator controlled!")
|
raise PermissionError("Undulator is operator controlled!")
|
||||||
|
|
||||||
# If it is already there, undulator will not move. The done flag
|
# If it is already there, undulator will not move. The done flag
|
||||||
# will not change, the moving change callback will not be called.
|
# will not change, the moving change callback will not be called.
|
||||||
# The status will not change.
|
# The status will not change.
|
||||||
if abs(position - self._position) < 0.0008:
|
if self._position is not None and abs(position - self._position) < 0.0008:
|
||||||
|
logger.info(
|
||||||
|
f"Undulator gap {self.name} already close to position {position}, not moving."
|
||||||
|
)
|
||||||
status = MoveStatus(self, position, done=True, success=True)
|
status = MoveStatus(self, position, done=True, success=True)
|
||||||
return status
|
return status
|
||||||
|
|
||||||
|
|||||||
@@ -49,6 +49,14 @@ def test_instant_completion_within_deadband(
|
|||||||
|
|
||||||
def test_undulator_raises_when_disabled(mock_undulator):
|
def test_undulator_raises_when_disabled(mock_undulator):
|
||||||
mock_undulator.select_control._read_pv.mock_data = 0
|
mock_undulator.select_control._read_pv.mock_data = 0
|
||||||
with pytest.raises(Exception) as e:
|
with pytest.raises(PermissionError) as e:
|
||||||
mock_undulator.move(5)
|
mock_undulator.move(5)
|
||||||
assert e.match("Undulator is operator controlled!")
|
assert e.match("Undulator is operator controlled!")
|
||||||
|
|
||||||
|
|
||||||
|
def test_undulator_stop_call(mock_undulator):
|
||||||
|
mock_undulator.select_control._read_pv.mock_data = 1
|
||||||
|
mock_undulator.stop()
|
||||||
|
mock_undulator.select_control._read_pv.mock_data = 0
|
||||||
|
with pytest.raises(PermissionError) as e:
|
||||||
|
mock_undulator.stop()
|
||||||
|
|||||||
Reference in New Issue
Block a user