From 1d6caa22918cd1bbc3696dbb0df507f2ed6d3773 Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 16 Sep 2025 17:35:27 +0200 Subject: [PATCH] refactor(pilatus-curtain): cleanup pilatus curtain --- debye_bec/devices/pilatus_curtain.py | 63 ++++++++++------------------ 1 file changed, 21 insertions(+), 42 deletions(-) diff --git a/debye_bec/devices/pilatus_curtain.py b/debye_bec/devices/pilatus_curtain.py index 6304800..4f8ab4e 100644 --- a/debye_bec/devices/pilatus_curtain.py +++ b/debye_bec/devices/pilatus_curtain.py @@ -1,73 +1,54 @@ """ES2 Pilatus Curtain""" from __future__ import annotations + import enum from typing import TYPE_CHECKING from ophyd import Component as Cpt from ophyd import EpicsSignal, EpicsSignalRO +from ophyd_devices import CompareStatus, DeviceStatus from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase -from ophyd_devices import CompareStatus, AndStatusWithList, DeviceStatus + +from debye_bec.devices.pilatus.utils import AndStatusWithList if TYPE_CHECKING: from bec_lib.devicemanager import ScanInfo + class PilatusCurtainError(Exception): """PilatusCurtain specific exception""" + class COVER(int, enum.Enum): """Pilatus Curtain States""" + # TODO What are the proper states here? - Probably enums for the states are better. OPEN = 0 CLOSED = 0 ERROR = 1 + class PilatusCurtain(PSIDeviceBase): """Class for the ES2 Pilatus Curtain""" USER_ACCESS = ["open", "close"] - open_cover = Cpt( - EpicsSignal, - suffix="OpenCover", - kind="config", - doc="Open Cover" - ) + open_cover = Cpt(EpicsSignal, suffix="OpenCover", kind="config", doc="Open Cover") - close_cover = Cpt( - EpicsSignal, - suffix="CloseCover", - kind="config", - doc="Close Cover" - ) + close_cover = Cpt(EpicsSignal, suffix="CloseCover", kind="config", doc="Close Cover") cover_is_closed = Cpt( - EpicsSignalRO, - suffix="CoverIsClosed", - kind="config", - doc="Cover is closed" + EpicsSignalRO, suffix="CoverIsClosed", kind="config", doc="Cover is closed" ) - cover_is_open = Cpt( - EpicsSignalRO, - suffix="CoverIsOpen", - kind="config", - doc="Cover is open" - ) + cover_is_open = Cpt(EpicsSignalRO, suffix="CoverIsOpen", kind="config", doc="Cover is open") cover_is_moving = Cpt( - EpicsSignalRO, - suffix="CoverIsMoving", - kind="config", - doc="Cover is moving" + EpicsSignalRO, suffix="CoverIsMoving", kind="config", doc="Cover is moving" ) - cover_error = Cpt( - EpicsSignalRO, - suffix="CoverError", - kind="config", - doc="Cover error" - ) + cover_error = Cpt(EpicsSignalRO, suffix="CoverError", kind="config", doc="Cover error") def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) @@ -82,7 +63,7 @@ class PilatusCurtain(PSIDeviceBase): Default values for signals should be set here. """ if self.cover_error.get() == COVER.ERROR: - raise PilatusCurtainError('Pilatus Curtain is in an error state!') + raise PilatusCurtainError("Pilatus Curtain is in an error state!") def on_stage(self) -> DeviceStatus | None: """Called while staging the device.""" @@ -100,11 +81,10 @@ class PilatusCurtain(PSIDeviceBase): """Open the cover""" if self.cover_is_closed.get() == COVER.CLOSED: self.open_cover.put(1) + # TODO timeout ok? status_open = CompareStatus(self.cover_is_open, COVER.OPEN, timeout=5) - status_error = CompareStatus(self.cover_error, COVER.ERROR, operation='!=') - status = AndStatusWithList( - device=self, status_list=[status_open, status_error] - ) + status_error = CompareStatus(self.cover_error, COVER.ERROR, operation="!=") + status = AndStatusWithList(device=self, status_list=[status_open, status_error]) return status else: return None @@ -113,11 +93,10 @@ class PilatusCurtain(PSIDeviceBase): """Close the cover""" if self.cover_is_open.get() == COVER.OPEN: self.close_cover.put(1) + # TODO timeout ok? status_close = CompareStatus(self.cover_is_closed, COVER.CLOSED, timeout=5) - status_error = CompareStatus(self.cover_error, COVER.ERROR, operation='!=') - status = AndStatusWithList( - device=self, status_list=[status_close, status_error] - ) + status_error = CompareStatus(self.cover_error, COVER.ERROR, operation="!=") + status = AndStatusWithList(device=self, status_list=[status_close, status_error]) return status else: return None