refactor(pilatus-curtain): cleanup pilatus curtain
This commit is contained in:
@@ -1,73 +1,54 @@
|
||||
"""ES2 Pilatus Curtain"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import EpicsSignal, EpicsSignalRO
|
||||
from ophyd_devices import CompareStatus, DeviceStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from ophyd_devices import CompareStatus, AndStatusWithList, DeviceStatus
|
||||
|
||||
from debye_bec.devices.pilatus.utils import AndStatusWithList
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
|
||||
|
||||
class PilatusCurtainError(Exception):
|
||||
"""PilatusCurtain specific exception"""
|
||||
|
||||
|
||||
class COVER(int, enum.Enum):
|
||||
"""Pilatus Curtain States"""
|
||||
|
||||
# TODO What are the proper states here? - Probably enums for the states are better.
|
||||
OPEN = 0
|
||||
CLOSED = 0
|
||||
ERROR = 1
|
||||
|
||||
|
||||
class PilatusCurtain(PSIDeviceBase):
|
||||
"""Class for the ES2 Pilatus Curtain"""
|
||||
|
||||
USER_ACCESS = ["open", "close"]
|
||||
|
||||
open_cover = Cpt(
|
||||
EpicsSignal,
|
||||
suffix="OpenCover",
|
||||
kind="config",
|
||||
doc="Open Cover"
|
||||
)
|
||||
open_cover = Cpt(EpicsSignal, suffix="OpenCover", kind="config", doc="Open Cover")
|
||||
|
||||
close_cover = Cpt(
|
||||
EpicsSignal,
|
||||
suffix="CloseCover",
|
||||
kind="config",
|
||||
doc="Close Cover"
|
||||
)
|
||||
close_cover = Cpt(EpicsSignal, suffix="CloseCover", kind="config", doc="Close Cover")
|
||||
|
||||
cover_is_closed = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="CoverIsClosed",
|
||||
kind="config",
|
||||
doc="Cover is closed"
|
||||
EpicsSignalRO, suffix="CoverIsClosed", kind="config", doc="Cover is closed"
|
||||
)
|
||||
|
||||
cover_is_open = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="CoverIsOpen",
|
||||
kind="config",
|
||||
doc="Cover is open"
|
||||
)
|
||||
cover_is_open = Cpt(EpicsSignalRO, suffix="CoverIsOpen", kind="config", doc="Cover is open")
|
||||
|
||||
cover_is_moving = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="CoverIsMoving",
|
||||
kind="config",
|
||||
doc="Cover is moving"
|
||||
EpicsSignalRO, suffix="CoverIsMoving", kind="config", doc="Cover is moving"
|
||||
)
|
||||
|
||||
cover_error = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="CoverError",
|
||||
kind="config",
|
||||
doc="Cover error"
|
||||
)
|
||||
cover_error = Cpt(EpicsSignalRO, suffix="CoverError", kind="config", doc="Cover error")
|
||||
|
||||
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
@@ -82,7 +63,7 @@ class PilatusCurtain(PSIDeviceBase):
|
||||
Default values for signals should be set here.
|
||||
"""
|
||||
if self.cover_error.get() == COVER.ERROR:
|
||||
raise PilatusCurtainError('Pilatus Curtain is in an error state!')
|
||||
raise PilatusCurtainError("Pilatus Curtain is in an error state!")
|
||||
|
||||
def on_stage(self) -> DeviceStatus | None:
|
||||
"""Called while staging the device."""
|
||||
@@ -100,11 +81,10 @@ class PilatusCurtain(PSIDeviceBase):
|
||||
"""Open the cover"""
|
||||
if self.cover_is_closed.get() == COVER.CLOSED:
|
||||
self.open_cover.put(1)
|
||||
# TODO timeout ok?
|
||||
status_open = CompareStatus(self.cover_is_open, COVER.OPEN, timeout=5)
|
||||
status_error = CompareStatus(self.cover_error, COVER.ERROR, operation='!=')
|
||||
status = AndStatusWithList(
|
||||
device=self, status_list=[status_open, status_error]
|
||||
)
|
||||
status_error = CompareStatus(self.cover_error, COVER.ERROR, operation="!=")
|
||||
status = AndStatusWithList(device=self, status_list=[status_open, status_error])
|
||||
return status
|
||||
else:
|
||||
return None
|
||||
@@ -113,11 +93,10 @@ class PilatusCurtain(PSIDeviceBase):
|
||||
"""Close the cover"""
|
||||
if self.cover_is_open.get() == COVER.OPEN:
|
||||
self.close_cover.put(1)
|
||||
# TODO timeout ok?
|
||||
status_close = CompareStatus(self.cover_is_closed, COVER.CLOSED, timeout=5)
|
||||
status_error = CompareStatus(self.cover_error, COVER.ERROR, operation='!=')
|
||||
status = AndStatusWithList(
|
||||
device=self, status_list=[status_close, status_error]
|
||||
)
|
||||
status_error = CompareStatus(self.cover_error, COVER.ERROR, operation="!=")
|
||||
status = AndStatusWithList(device=self, status_list=[status_close, status_error])
|
||||
return status
|
||||
else:
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user