feat: Added frontend absorber
CI for debye_bec / test (push) Failing after 1m3s
CI for debye_bec / test (pull_request) Failing after 2m16s

This commit is contained in:
x01da
2026-04-27 15:20:02 +02:00
parent c428bb5a87
commit 4103b3153a
2 changed files with 85 additions and 0 deletions
@@ -1,4 +1,18 @@
###################################
## Frontend Absorber ##
###################################
abs:
readoutPriority: baseline
description: Frontend Absorber
deviceClass: debye_bec.devices.absorber.Absorber
deviceConfig:
prefix: "X01DA-FE-ABS1:"
onFailure: retry
enabled: true
softwareTrigger: false
###################################
## Frontend Slits ##
###################################
+71
View File
@@ -0,0 +1,71 @@
"""Frontend Absorber"""
from __future__ import annotations
import enum
from typing import TYPE_CHECKING
from ophyd import Component as Cpt
from ophyd import EpicsSignal, EpicsSignalRO
from ophyd_devices import CompareStatus, DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
class AbsorberError(Exception):
"""Absorber specific exception"""
class STATUS(int, enum.Enum):
"""Absorber States"""
MOVING_CLOSE = 0
OPEN = 1
MOVING_OPEN = 2
CLOSED = 3
NOT_ENABLED = 4
TIMEOUT_CLOSE = 5
TIMEOUT_OPEN = 6
CLOSE_LS_LOST = 7
OPEN_LS_LOST = 8
CLOSE_LS_NOT_FREE = 9
OPEN_LS_NOT_FREE = 10
ERROR_LS = 11
TO_CONNECT = 12
MAN_OPEN = 13
UNDEFINED = 14
class Absorber(PSIDeviceBase):
"""Class for the Frontend Absorber"""
USER_ACCESS = ["open", "close"]
request = Cpt(EpicsSignal, suffix="REQUEST", kind="config", doc="Open/Close Absorber")
status = Cpt(EpicsSignalRO, suffix="STATUS", kind="config", doc="Absorber Status")
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.timeout_for_move = 10
# Wait for connection on all components, ensure IOC is connected
self.wait_for_connection(all_signals=True, timeout=5)
def open(self) -> DeviceStatus | None:
"""Open the Absorber"""
if self.status.get() == STATUS.CLOSED:
self.request.put(1)
status_open = CompareStatus(self.status, STATUS.OPEN, timeout=self.timeout_for_move)
status = status_open
return status
else:
return None
def close(self) -> DeviceStatus | None:
"""Close the Absorber"""
if self.status.get() == STATUS.OPEN:
self.request.put(1)
status_close = CompareStatus(self.status, STATUS.CLOSED, timeout=self.timeout_for_move)
status = status_close
return status
else:
return None