diff --git a/debye_bec/device_configs/x01da_frontend.yaml b/debye_bec/device_configs/x01da_frontend.yaml index bc097a3..b91a259 100644 --- a/debye_bec/device_configs/x01da_frontend.yaml +++ b/debye_bec/device_configs/x01da_frontend.yaml @@ -1,4 +1,18 @@ +################################### +## Frontend Absorber ## +################################### + +abs: + readoutPriority: baseline + description: Frontend Absorber + deviceClass: debye_bec.devices.absorber.Absorber + deviceConfig: + prefix: "X01DA-FE-ABS1:" + onFailure: retry + enabled: true + softwareTrigger: false + ################################### ## Frontend Slits ## ################################### diff --git a/debye_bec/devices/absorber.py b/debye_bec/devices/absorber.py new file mode 100644 index 0000000..1755a44 --- /dev/null +++ b/debye_bec/devices/absorber.py @@ -0,0 +1,71 @@ +"""Frontend Absorber""" + +from __future__ import annotations + +import enum +from typing import TYPE_CHECKING + +from ophyd import Component as Cpt +from ophyd import EpicsSignal, EpicsSignalRO +from ophyd_devices import CompareStatus, DeviceStatus +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase + +if TYPE_CHECKING: + from bec_lib.devicemanager import ScanInfo + +class AbsorberError(Exception): + """Absorber specific exception""" + +class STATUS(int, enum.Enum): + """Absorber States""" + + MOVING_CLOSE = 0 + OPEN = 1 + MOVING_OPEN = 2 + CLOSED = 3 + NOT_ENABLED = 4 + TIMEOUT_CLOSE = 5 + TIMEOUT_OPEN = 6 + CLOSE_LS_LOST = 7 + OPEN_LS_LOST = 8 + CLOSE_LS_NOT_FREE = 9 + OPEN_LS_NOT_FREE = 10 + ERROR_LS = 11 + TO_CONNECT = 12 + MAN_OPEN = 13 + UNDEFINED = 14 + +class Absorber(PSIDeviceBase): + """Class for the Frontend Absorber""" + + USER_ACCESS = ["open", "close"] + + request = Cpt(EpicsSignal, suffix="REQUEST", kind="config", doc="Open/Close Absorber") + status = Cpt(EpicsSignalRO, suffix="STATUS", kind="config", doc="Absorber Status") + + def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): + super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) + + self.timeout_for_move = 10 + # Wait for connection on all components, ensure IOC is connected + self.wait_for_connection(all_signals=True, timeout=5) + + def open(self) -> DeviceStatus | None: + """Open the Absorber""" + if self.status.get() == STATUS.CLOSED: + self.request.put(1) + status_open = CompareStatus(self.status, STATUS.OPEN, timeout=self.timeout_for_move) + status = status_open + return status + else: + return None + + def close(self) -> DeviceStatus | None: + """Close the Absorber""" + if self.status.get() == STATUS.OPEN: + self.request.put(1) + status_close = CompareStatus(self.status, STATUS.CLOSED, timeout=self.timeout_for_move) + status = status_close + return status + else: + return None