refactor(shutter): Refactor signal structure of shutter device

This commit is contained in:
2025-11-28 13:14:56 +01:00
committed by David Perl
parent ad1b042f2e
commit 77eaca174e
2 changed files with 84 additions and 59 deletions

View File

@@ -1,47 +1,43 @@
"""
Module for the optics shutter device at PSI beamlines.
Example config:
shutter:
description: Optics Shutter A
deviceClass: ophyd_devices.optics_shutter.Shutter
deviceConfig: {prefix: 'X10SA-EH1-PSYS:SH-A-'}
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
softwareTrigger: false
userParameter: {}
"""
from enum import IntEnum from enum import IntEnum
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import EpicsSignal, EpicsSignalRO, Kind from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind, Signal
from ophyd.status import SubscriptionStatus
from ophyd_devices import CompareStatus from ophyd_devices import CompareStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
class ShutterOpenState(IntEnum): class ShutterOpenState(IntEnum):
"""Enum for shutter open state."""
OPEN = 1 OPEN = 1
CLOSED = 0 CLOSED = 0
class ShutterEnabled(IntEnum): class ShutterEnabled(IntEnum):
"""Enum for shutter enabled state."""
ENABLED = 1 ENABLED = 1
DISABLED = 0 DISABLED = 0
class Shutter(PSIDeviceBase): class ShutterControl(Device):
"""A generic optics shutter device, for IOCs with the format '[BEAMLINE]-EH1-PSYS:SH-[A/B]-' """Control interface for the PVs related to the shutter."""
Example config:
shutter:
description: Optics Shutter A
deviceClass: ophyd_devices.optics_shutter.Shutter
deviceConfig: {prefix: 'X10SA-EH1-PSYS:SH-A-'}
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
softwareTrigger: false
userParameter: {}
Example usage:
shutter = Shutter(name="shutter", prefix="X10SA-EH1-PSYS:SH-A-")
if shutter.enabled == ShutterEnabled.ENABLED:
st = shutter.open()
st.wait()
"""
USER_ACCESS = ["open", "close", "status", "enabled"]
is_open = Cpt(EpicsSignalRO, "OPEN", kind=Kind.config, auto_monitor=True) is_open = Cpt(EpicsSignalRO, "OPEN", kind=Kind.config, auto_monitor=True)
is_closed = Cpt(EpicsSignalRO, "CLOSE", kind=Kind.omitted) is_closed = Cpt(EpicsSignalRO, "CLOSE", kind=Kind.omitted)
@@ -51,33 +47,56 @@ class Shutter(PSIDeviceBase):
set_open = Cpt(EpicsSignal, "OPEN-SET", kind=Kind.omitted) set_open = Cpt(EpicsSignal, "OPEN-SET", kind=Kind.omitted)
set_closed = Cpt(EpicsSignal, "CLOSE-SET", kind=Kind.omitted) set_closed = Cpt(EpicsSignal, "CLOSE-SET", kind=Kind.omitted)
def _check_enabled(self):
if self.enabled() != ShutterEnabled.ENABLED:
raise RuntimeError("The shutter is disabled!")
def open(self): class ShutterOpenSignal(Signal):
"""Open the shutter. """
ShutterOpenSignal. When called with 1, it will try to open the shutter.
Returns: ophyd.status.SubscriptionStatus which resolved when the shutter is opened. If called with 0, it will try to close the shutter.
""" """
self._check_enabled()
self.set_open.put(1)
return CompareStatus(self.is_open, ShutterOpenState.OPEN)
def close(self):
"""Close the shutter.
Returns: ophyd.status.SubscriptionStatus which resolved when the shutter is closed.
"""
self._check_enabled()
self.set_closed.put(1)
return CompareStatus(self.is_open, ShutterOpenState.CLOSED)
def status(self) -> ShutterOpenState:
return ShutterOpenState(self.is_open.get())
def enabled(self) -> ShutterEnabled: def enabled(self) -> ShutterEnabled:
return ShutterEnabled(self.is_enabled.get()) """Check if the shutter is enabled."""
return ShutterEnabled(self.parent.epics_control.is_enabled.get())
def _check_enabled(self):
if self.enabled() != ShutterEnabled.ENABLED:
dev_name = self.parent.name if self.parent else "unknown"
raise RuntimeError(f"The shutter {dev_name} is disabled!")
def get(self) -> int:
return self.parent.epics_control.is_open.get()
def put(self, value: int, **kwargs) -> None:
self._check_enabled()
if value == ShutterOpenState.OPEN:
self.parent.epics_control.set_open.put(1, **kwargs)
elif value == ShutterOpenState.CLOSED:
self.parent.epics_control.set_closed.put(1, **kwargs)
else:
raise ValueError("Invalid value for ShutterOpenSignal. Use 0 (CLOSED) or 1 (OPEN).")
def set(self, value: int, **kwargs) -> CompareStatus:
self.put(value, **kwargs)
return CompareStatus(self.parent.epics_control.is_open, value)
class Shutter(Device):
"""A shutter device with shutter open signal, and sub-device with full control PVs of the Epics implementation."""
set_open = Cpt(
ShutterOpenSignal,
name="open",
kind=Kind.omitted,
doc="Signal to open/close the shutter. Use 0 (CLOSED) or 1 (OPEN).",
)
is_open = Cpt(
EpicsSignalRO,
"OPEN",
kind=Kind.config,
auto_monitor=True,
doc="Readback of the shutter open state. 0 (CLOSED) or 1 (OPEN).",
)
epics_control = Cpt(ShutterControl, suffix="", name="epics_control", kind=Kind.omitted)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -11,31 +11,37 @@ def mock_shutter():
def test_shutter_open(mock_shutter): def test_shutter_open(mock_shutter):
mock_shutter.is_enabled._read_pv.mock_data = ShutterEnabled.ENABLED.value mock_shutter.epics_control.is_enabled._read_pv.mock_data = ShutterEnabled.ENABLED.value
mock_shutter.epics_control.is_open._read_pv.mock_data = ShutterOpenState.CLOSED.value
mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.CLOSED.value mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.CLOSED.value
st = mock_shutter.open() st = mock_shutter.set_open.set(1)
assert not st.done assert not st.done
mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.OPEN.value mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.OPEN.value
mock_shutter.epics_control.is_open._read_pv.mock_data = ShutterOpenState.OPEN.value
assert st.done assert st.done
def test_shutter_close(mock_shutter): def test_shutter_close(mock_shutter):
mock_shutter.is_enabled._read_pv.mock_data = ShutterEnabled.ENABLED.value mock_shutter.epics_control.is_enabled._read_pv.mock_data = ShutterEnabled.ENABLED.value
mock_shutter.epics_control.is_open._read_pv.mock_data = ShutterOpenState.OPEN.value
mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.OPEN.value mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.OPEN.value
st = mock_shutter.close() st = mock_shutter.set_open.set(0)
assert not st.done assert not st.done
mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.CLOSED.value mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.CLOSED.value
mock_shutter.epics_control.is_open._read_pv.mock_data = ShutterOpenState.CLOSED.value
assert st.done assert st.done
def test_shutter_not_enabled(mock_shutter): def test_shutter_not_enabled(mock_shutter):
with pytest.raises(RuntimeError) as e: with pytest.raises(RuntimeError) as e:
mock_shutter.open() mock_shutter.set_open.set(1)
assert e.match("The shutter is disabled!") assert e.match(f"The shutter {mock_shutter.name} is disabled!")
def test_shutter_status(mock_shutter): def test_shutter_status(mock_shutter):
mock_shutter.epics_control.is_open._read_pv.mock_data = ShutterOpenState.OPEN.value
mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.OPEN.value mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.OPEN.value
assert mock_shutter.status() == ShutterOpenState.OPEN.value assert mock_shutter.is_open.get() == ShutterOpenState.OPEN.value
mock_shutter.epics_control.is_open._read_pv.mock_data = ShutterOpenState.CLOSED.value
mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.CLOSED.value mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.CLOSED.value
assert mock_shutter.status() == ShutterOpenState.CLOSED.value assert mock_shutter.is_open.get() == ShutterOpenState.CLOSED.value