diff --git a/debye_bec/devices/ionization_chambers/ionization_chamber.py b/debye_bec/devices/ionization_chambers/ionization_chamber.py index 2b7bfd6..deb970e 100644 --- a/debye_bec/devices/ionization_chambers/ionization_chamber.py +++ b/debye_bec/devices/ionization_chambers/ionization_chamber.py @@ -1,15 +1,19 @@ +"""Ionization chamber device class""" + from __future__ import annotations from typing import TYPE_CHECKING, Literal +from typeguard import typechecked import numpy as np from ophyd import Component as Cpt from ophyd import Device from ophyd import DynamicDeviceComponent as Dcpt -from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Kind +from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV from ophyd.status import DeviceStatus, SubscriptionStatus +from ophyd_devices import CompareStatus, TransitionStatus from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase -from typeguard import typechecked + from debye_bec.devices.ionization_chambers.ionization_chamber_enums import ( AmplifierEnable, @@ -110,18 +114,10 @@ class IonizationChamber0(PSIDeviceBase): """ if self.amp.cOnOff.get() == AmplifierEnable.OFF: + status = CompareStatus(self.amp.cOnOff, AmplifierEnable.ON) + self.cancel_on_stop(status) self.amp.cOnOff.put(AmplifierEnable.ON) - - # Wait until channel is switched on - def _wait_enabled(): - return self.amp.cOnOff.get() == AmplifierEnable.ON - - if not self.wait_for_condition( - _wait_enabled, check_stopped=True, timeout=self.timeout_for_pvwait - ): - raise TimeoutError( - f"Enabling channel run into timeout after {self.timeout_for_pvwait} seconds" - ) + status.wait(self.timeout_for_pvwait) match gain: case "1e6": @@ -144,21 +140,13 @@ class IonizationChamber0(PSIDeviceBase): """Configure the filter setting of the specified channel Args: - value (Literal['1us', '3us', '10us', '30us', '100us', '300us', '1ms', '3ms']) : Desired filter + value (Literal['1us','3us','10us','30us','100us','300us','1ms','3ms']) :Desired filter """ if self.amp.cOnOff.get() == AmplifierEnable.OFF: + status = CompareStatus(self.amp.cOnOff, AmplifierEnable.ON) + self.cancel_on_stop(status) self.amp.cOnOff.put(AmplifierEnable.ON) - - # Wait until channel is switched on - def _wait_enabled(): - return self.amp.cOnOff.get() == AmplifierEnable.ON - - if not self.wait_for_condition( - _wait_enabled, check_stopped=True, timeout=self.timeout_for_pvwait - ): - raise TimeoutError( - f"Enabling channel run into timeout after {self.timeout_for_pvwait} seconds" - ) + status.wait(self.timeout_for_pvwait) match value: case "1us": @@ -187,20 +175,16 @@ class IonizationChamber0(PSIDeviceBase): hv (float) : Desired voltage for the 'HV' terminal. Voltage has to be between 0...3000 """ - if not (0 <= hv <= 3000): + if not 0 <= hv <= 3000: raise ValueError(f"specified HV {hv} not within range [0 .. 3000]") if not np.isclose(np.abs(hv - self.hv.grid_v.get()), 0, atol=3): raise ValueError(f"Grid {self.hv.grid_v.get()} must not be higher than HV {hv}!") if not self.hv_en.ena.get() == 1: - - def check_ch_ena(*, old_value, value, **kwargs): - return value == 1 - - status = SubscriptionStatus(device=self.hv_en.ena, callback=check_ch_ena) + status = CompareStatus(self.hv_en.ena, 1) + self.cancel_on_stop(status) self.hv_en.ena.put(1) - # Wait after setting ena to 1 - status.wait(timeout=2) + status.wait(self.timeout_for_pvwait) # Set current fixed to 3 mA (max) self.hv.hv_i.put(3) @@ -212,23 +196,20 @@ class IonizationChamber0(PSIDeviceBase): enable the high voltage (if external enable is active)! Args: - grid (float) : Desired voltage for the 'Grid' terminal, Grid Voltage has to be between 0...3000 + grid (float) : Desired voltage for the 'Grid' terminal, + Grid Voltage has to be between 0...3000 """ - if not (0 <= grid <= 3000): + if not 0 <= grid <= 3000: raise ValueError(f"specified Grid {grid} not within range [0 .. 3000]") if not np.isclose(np.abs(grid - self.hv.hv_v.get()), 0, atol=3): raise ValueError(f"Grid {grid} must not be higher than HV {self.hv.hv_v.get()}!") if not self.hv_en.ena.get() == 1: - - def check_ch_ena(*, old_value, value, **kwargs): - return value == 1 - - status = SubscriptionStatus(device=self.hv_en.ena, callback=check_ch_ena) + status = CompareStatus(self.hv_en.ena, 1) + self.cancel_on_stop(status) self.hv_en.ena.put(1) - # Wait after setting ena to 1 - status.wait(timeout=2) + status.wait(self.timeout_for_pvwait) # Set current fixed to 3 mA (max) self.hv.grid_i.put(3) @@ -244,7 +225,7 @@ class IonizationChamber0(PSIDeviceBase): pressure: float, *, wait: bool = False, - ) -> DeviceStatus: + ) -> DeviceStatus | None: """Fill an ionization chamber with the specified gas mixture. Args: @@ -256,13 +237,13 @@ class IonizationChamber0(PSIDeviceBase): wait (bool): If you like to wait for the filling to finish. """ - if not (0 <= conc1 <= 100): + if not 0 <= conc1 <= 100: raise ValueError(f"Concentration 1 {conc1} out of range [0 .. 100 %]") - if not (0 <= conc2 <= 100): + if not 0 <= conc2 <= 100: raise ValueError(f"Concentration 2 {conc2} out of range [0 .. 100 %]") if not np.isclose((conc1 + conc2), 100, atol=0.1): raise ValueError(f"Conc1 {conc1} and conc2 {conc2} must sum to 100 +- 0.1") - if not (0 <= pressure <= 3): + if not 0 <= pressure <= 3: raise ValueError(f"Pressure {pressure} out of range [0 .. 3 bar abs]") self.gmes.gas1_req.set(gas1).wait(timeout=3) @@ -270,28 +251,13 @@ class IonizationChamber0(PSIDeviceBase): self.gmes.gas2_req.set(gas2).wait(timeout=3) self.gmes.conc2_req.set(conc2).wait(timeout=3) + status = TransitionStatus(self.gmes.status.get(), [0, 1]) + self.cancel_on_stop(status) self.gmes.fill.put(1) - - def wait_for_status(): - return self.gmes.status.get() == 0 - - timeout = 3 - if not self.wait_for_condition(wait_for_status, timeout=timeout, check_stopped=True): - raise TimeoutError( - f"Ionization chamber filling process did not start after {timeout}s. Last log message {self.gmes_status.get()}" - ) - - def wait_for_filling_finished(): - return self.gmes.status.get() == 1 - - # Wait until ionization chamber is filled successfully - status = self.task_handler.submit_task( - task=self.wait_for_condition, task_args=(wait_for_filling_finished, 360, True) - ) if wait: - status.wait() - return status - + status.wait(timeout=360) + else: + return status class IonizationChamber1(IonizationChamber0): """Ionization Chamber 1, prefix should be 'X01DA-'.""" diff --git a/debye_bec/devices/pilatus_curtain.py b/debye_bec/devices/pilatus_curtain.py index 94c05b8..0cc5c0a 100644 --- a/debye_bec/devices/pilatus_curtain.py +++ b/debye_bec/devices/pilatus_curtain.py @@ -1,75 +1,125 @@ """ES2 Pilatus Curtain""" -import time +from __future__ import annotations +import enum +from typing import TYPE_CHECKING from ophyd import Component as Cpt -from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind -from ophyd_devices.utils import bec_utils +from ophyd import EpicsSignal, EpicsSignalRO +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase +from ophyd_devices import CompareStatus, AndStatusWithList, DeviceStatus +if TYPE_CHECKING: + from bec_lib.devicemanager import ScanInfo -class GasMixSetup(Device): +class PilatusCurtainError(Exception): + """PilatusCurtain specific exception""" + +class COVER(int, enum.Enum): + """Pilatus Curtain States""" + + OPEN = 0 + CLOSED = 0 + ERROR = 1 + +class PilatusCurtain(PSIDeviceBase): """Class for the ES2 Pilatus Curtain""" USER_ACCESS = ["open", "close"] - open_cover = Cpt(EpicsSignal, suffix="OpenCover", kind="config", doc="Open Cover") - close_cover = Cpt(EpicsSignal, suffix="CloseCover", kind="config", doc="Close Cover") + open_cover = Cpt( + EpicsSignal, + suffix="OpenCover", + kind="config", + doc="Open Cover" + ) + + close_cover = Cpt( + EpicsSignal, + suffix="CloseCover", + kind="config", + doc="Close Cover" + ) + cover_is_closed = Cpt( - EpicsSignalRO, suffix="CoverIsClosed", kind="config", doc="Cover is closed" + EpicsSignalRO, + suffix="CoverIsClosed", + kind="config", + doc="Cover is closed" ) - cover_is_open = Cpt(EpicsSignalRO, suffix="CoverIsOpen", kind="config", doc="Cover is open") + + cover_is_open = Cpt( + EpicsSignalRO, + suffix="CoverIsOpen", + kind="config", + doc="Cover is open" + ) + cover_is_moving = Cpt( - EpicsSignalRO, suffix="CoverIsMoving", kind="config", doc="Cover is moving" + EpicsSignalRO, + suffix="CoverIsMoving", + kind="config", + doc="Cover is moving" ) - cover_error = Cpt(EpicsSignalRO, suffix="CoverError", kind="config", doc="Cover error") - def __init__( - self, prefix="", *, name: str, kind: Kind = None, device_manager=None, parent=None, **kwargs - ): - """Initialize the Pilatus Curtain. + cover_error = Cpt( + EpicsSignalRO, + suffix="CoverError", + kind="config", + doc="Cover error" + ) - Args: - prefix (str): EPICS prefix for the device - name (str): Name of the device - kind (Kind): Kind of the device - device_manager (DeviceManager): Device manager instance - parent (Device): Parent device - kwargs: Additional keyword arguments - """ - super().__init__(prefix, name=name, kind=kind, parent=parent, **kwargs) - self.device_manager = device_manager - self.service_cfg = None + def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): + super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) self.timeout_for_pvwait = 30 - self.readback.name = self.name # Wait for connection on all components, ensure IOC is connected self.wait_for_connection(all_signals=True, timeout=5) - if device_manager: - self.device_manager = device_manager - else: - self.device_manager = bec_utils.DMMock() + def on_connected(self) -> None: + """ + Called after the device is connected and its signals are connected. + Default values for signals should be set here. + """ + if self.cover_error.get() == COVER.ERROR: + raise PilatusCurtainError('Pilatus Curtain is in an error state!') - self.connector = self.device_manager.connector + def on_stage(self) -> DeviceStatus | None: + """Called while staging the device.""" + return self.open() - def open(self) -> None: + def on_unstage(self) -> DeviceStatus | None: + """Called while unstaging the device.""" + return self.close() + + def on_stop(self) -> DeviceStatus | None: + """Called when the device is stopped.""" + return self.close() + + def open(self) -> DeviceStatus | None: """Open the cover""" + if self.cover_is_closed.get() == COVER.CLOSED: + self.open_cover.put(1) + status_open = CompareStatus(self.cover_is_open, COVER.OPEN, timeout=5) + status_error = CompareStatus(self.cover_error, COVER.ERROR, operation='!=') + status = AndStatusWithList( + device=self, status_list=[status_open, status_error] + ) + self.cancel_on_stop(status) + return status + else: + return None - self.open_cover.put(1) - - while not self.cover_is_open.get(): - time.sleep(0.1) - if self.cover_error.get(): - raise TimeoutError("Curtain did not open successfully and is now in an error state") - - def close(self) -> None: + def close(self) -> DeviceStatus | None: """Close the cover""" - - self.close_cover.put(1) - - while not self.cover_is_closed.get(): - time.sleep(0.1) - if self.cover_error.get(): - raise TimeoutError( - "Curtain did not close successfully and is now in an error state" - ) + if self.cover_is_open.get() == COVER.OPEN: + self.close_cover.put(1) + status_close = CompareStatus(self.cover_is_closed, COVER.CLOSED, timeout=5) + status_error = CompareStatus(self.cover_error, COVER.ERROR, operation='!=') + status = AndStatusWithList( + device=self, status_list=[status_close, status_error] + ) + self.cancel_on_stop(status) + return status + else: + return None