73 lines
2.2 KiB
Python
73 lines
2.2 KiB
Python
"""Frontend Absorber"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import enum
|
|
from typing import TYPE_CHECKING
|
|
|
|
from ophyd import Component as Cpt
|
|
from ophyd import EpicsSignal, EpicsSignalRO
|
|
from ophyd_devices import CompareStatus, DeviceStatus
|
|
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
|
|
|
if TYPE_CHECKING:
|
|
from bec_lib.devicemanager import ScanInfo
|
|
|
|
class AbsorberError(Exception):
|
|
"""Absorber specific exception"""
|
|
|
|
class STATUS(int, enum.Enum):
|
|
"""Absorber States"""
|
|
|
|
MOVING_CLOSE = 0
|
|
OPEN = 1
|
|
MOVING_OPEN = 2
|
|
CLOSED = 3
|
|
NOT_ENABLED = 4
|
|
TIMEOUT_CLOSE = 5
|
|
TIMEOUT_OPEN = 6
|
|
CLOSE_LS_LOST = 7
|
|
OPEN_LS_LOST = 8
|
|
CLOSE_LS_NOT_FREE = 9
|
|
OPEN_LS_NOT_FREE = 10
|
|
ERROR_LS = 11
|
|
TO_CONNECT = 12
|
|
MAN_OPEN = 13
|
|
UNDEFINED = 14
|
|
|
|
class Absorber(PSIDeviceBase):
|
|
"""Class for the Frontend Absorber"""
|
|
|
|
USER_ACCESS = ["open", "close"]
|
|
|
|
request = Cpt(EpicsSignal, suffix="REQUEST", kind="config", doc="Open/Close Absorber")
|
|
status = Cpt(EpicsSignalRO, suffix="STATUS", kind="normal", doc="Absorber Status")
|
|
status_string = Cpt(EpicsSignalRO, suffix="STATUS", kind="normal", string=True, doc="Absorber Status")
|
|
|
|
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
|
|
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
|
|
|
self.timeout_for_move = 10
|
|
# Wait for connection on all components, ensure IOC is connected
|
|
self.wait_for_connection(all_signals=True, timeout=5)
|
|
|
|
def open(self) -> DeviceStatus | None:
|
|
"""Open the Absorber"""
|
|
if self.status.get() == STATUS.CLOSED:
|
|
self.request.put(1)
|
|
status_open = CompareStatus(self.status, STATUS.OPEN, timeout=self.timeout_for_move)
|
|
status = status_open
|
|
return status
|
|
else:
|
|
return None
|
|
|
|
def close(self) -> DeviceStatus | None:
|
|
"""Close the Absorber"""
|
|
if self.status.get() == STATUS.OPEN:
|
|
self.request.put(1)
|
|
status_close = CompareStatus(self.status, STATUS.CLOSED, timeout=self.timeout_for_move)
|
|
status = status_close
|
|
return status
|
|
else:
|
|
return None
|