From 77eaca174e4e2c68500362b267da0ebca3d38013 Mon Sep 17 00:00:00 2001 From: appel_c Date: Fri, 28 Nov 2025 13:14:56 +0100 Subject: [PATCH] refactor(shutter): Refactor signal structure of shutter device --- ophyd_devices/devices/optics_shutter.py | 121 ++++++++++++++---------- tests/test_shutter.py | 22 +++-- 2 files changed, 84 insertions(+), 59 deletions(-) diff --git a/ophyd_devices/devices/optics_shutter.py b/ophyd_devices/devices/optics_shutter.py index 2ccb11d..4026d59 100644 --- a/ophyd_devices/devices/optics_shutter.py +++ b/ophyd_devices/devices/optics_shutter.py @@ -1,47 +1,43 @@ +""" +Module for the optics shutter device at PSI beamlines. + +Example config: +shutter: + description: Optics Shutter A + deviceClass: ophyd_devices.optics_shutter.Shutter + deviceConfig: {prefix: 'X10SA-EH1-PSYS:SH-A-'} + enabled: true + onFailure: retry + readOnly: false + readoutPriority: baseline + softwareTrigger: false + userParameter: {} +""" + from enum import IntEnum from ophyd import Component as Cpt -from ophyd import EpicsSignal, EpicsSignalRO, Kind -from ophyd.status import SubscriptionStatus +from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind, Signal from ophyd_devices import CompareStatus -from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase class ShutterOpenState(IntEnum): + """Enum for shutter open state.""" + OPEN = 1 CLOSED = 0 class ShutterEnabled(IntEnum): + """Enum for shutter enabled state.""" + ENABLED = 1 DISABLED = 0 -class Shutter(PSIDeviceBase): - """A generic optics shutter device, for IOCs with the format '[BEAMLINE]-EH1-PSYS:SH-[A/B]-' - - Example config: - shutter: - description: Optics Shutter A - deviceClass: ophyd_devices.optics_shutter.Shutter - deviceConfig: {prefix: 'X10SA-EH1-PSYS:SH-A-'} - enabled: true - onFailure: retry - readOnly: false - readoutPriority: baseline - softwareTrigger: false - userParameter: {} - - Example usage: - shutter = Shutter(name="shutter", prefix="X10SA-EH1-PSYS:SH-A-") - if shutter.enabled == ShutterEnabled.ENABLED: - st = shutter.open() - st.wait() - - """ - - USER_ACCESS = ["open", "close", "status", "enabled"] +class ShutterControl(Device): + """Control interface for the PVs related to the shutter.""" is_open = Cpt(EpicsSignalRO, "OPEN", kind=Kind.config, auto_monitor=True) is_closed = Cpt(EpicsSignalRO, "CLOSE", kind=Kind.omitted) @@ -51,33 +47,56 @@ class Shutter(PSIDeviceBase): set_open = Cpt(EpicsSignal, "OPEN-SET", kind=Kind.omitted) set_closed = Cpt(EpicsSignal, "CLOSE-SET", kind=Kind.omitted) - def _check_enabled(self): - if self.enabled() != ShutterEnabled.ENABLED: - raise RuntimeError("The shutter is disabled!") - def open(self): - """Open the shutter. - - Returns: ophyd.status.SubscriptionStatus which resolved when the shutter is opened. - """ - self._check_enabled() - self.set_open.put(1) - return CompareStatus(self.is_open, ShutterOpenState.OPEN) - - def close(self): - """Close the shutter. - - Returns: ophyd.status.SubscriptionStatus which resolved when the shutter is closed. - """ - self._check_enabled() - self.set_closed.put(1) - return CompareStatus(self.is_open, ShutterOpenState.CLOSED) - - def status(self) -> ShutterOpenState: - return ShutterOpenState(self.is_open.get()) +class ShutterOpenSignal(Signal): + """ + ShutterOpenSignal. When called with 1, it will try to open the shutter. + If called with 0, it will try to close the shutter. + """ def enabled(self) -> ShutterEnabled: - return ShutterEnabled(self.is_enabled.get()) + """Check if the shutter is enabled.""" + return ShutterEnabled(self.parent.epics_control.is_enabled.get()) + + def _check_enabled(self): + if self.enabled() != ShutterEnabled.ENABLED: + dev_name = self.parent.name if self.parent else "unknown" + raise RuntimeError(f"The shutter {dev_name} is disabled!") + + def get(self) -> int: + return self.parent.epics_control.is_open.get() + + def put(self, value: int, **kwargs) -> None: + self._check_enabled() + if value == ShutterOpenState.OPEN: + self.parent.epics_control.set_open.put(1, **kwargs) + elif value == ShutterOpenState.CLOSED: + self.parent.epics_control.set_closed.put(1, **kwargs) + else: + raise ValueError("Invalid value for ShutterOpenSignal. Use 0 (CLOSED) or 1 (OPEN).") + + def set(self, value: int, **kwargs) -> CompareStatus: + self.put(value, **kwargs) + return CompareStatus(self.parent.epics_control.is_open, value) + + +class Shutter(Device): + """A shutter device with shutter open signal, and sub-device with full control PVs of the Epics implementation.""" + + set_open = Cpt( + ShutterOpenSignal, + name="open", + kind=Kind.omitted, + doc="Signal to open/close the shutter. Use 0 (CLOSED) or 1 (OPEN).", + ) + is_open = Cpt( + EpicsSignalRO, + "OPEN", + kind=Kind.config, + auto_monitor=True, + doc="Readback of the shutter open state. 0 (CLOSED) or 1 (OPEN).", + ) + epics_control = Cpt(ShutterControl, suffix="", name="epics_control", kind=Kind.omitted) if __name__ == "__main__": diff --git a/tests/test_shutter.py b/tests/test_shutter.py index 3d45f34..016e31c 100644 --- a/tests/test_shutter.py +++ b/tests/test_shutter.py @@ -11,31 +11,37 @@ def mock_shutter(): def test_shutter_open(mock_shutter): - mock_shutter.is_enabled._read_pv.mock_data = ShutterEnabled.ENABLED.value + mock_shutter.epics_control.is_enabled._read_pv.mock_data = ShutterEnabled.ENABLED.value + mock_shutter.epics_control.is_open._read_pv.mock_data = ShutterOpenState.CLOSED.value mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.CLOSED.value - st = mock_shutter.open() + st = mock_shutter.set_open.set(1) assert not st.done mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.OPEN.value + mock_shutter.epics_control.is_open._read_pv.mock_data = ShutterOpenState.OPEN.value assert st.done def test_shutter_close(mock_shutter): - mock_shutter.is_enabled._read_pv.mock_data = ShutterEnabled.ENABLED.value + mock_shutter.epics_control.is_enabled._read_pv.mock_data = ShutterEnabled.ENABLED.value + mock_shutter.epics_control.is_open._read_pv.mock_data = ShutterOpenState.OPEN.value mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.OPEN.value - st = mock_shutter.close() + st = mock_shutter.set_open.set(0) assert not st.done mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.CLOSED.value + mock_shutter.epics_control.is_open._read_pv.mock_data = ShutterOpenState.CLOSED.value assert st.done def test_shutter_not_enabled(mock_shutter): with pytest.raises(RuntimeError) as e: - mock_shutter.open() - assert e.match("The shutter is disabled!") + mock_shutter.set_open.set(1) + assert e.match(f"The shutter {mock_shutter.name} is disabled!") def test_shutter_status(mock_shutter): + mock_shutter.epics_control.is_open._read_pv.mock_data = ShutterOpenState.OPEN.value mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.OPEN.value - assert mock_shutter.status() == ShutterOpenState.OPEN.value + assert mock_shutter.is_open.get() == ShutterOpenState.OPEN.value + mock_shutter.epics_control.is_open._read_pv.mock_data = ShutterOpenState.CLOSED.value mock_shutter.is_open._read_pv.mock_data = ShutterOpenState.CLOSED.value - assert mock_shutter.status() == ShutterOpenState.CLOSED.value + assert mock_shutter.is_open.get() == ShutterOpenState.CLOSED.value