refactoring

This commit is contained in:
x01da
2025-09-16 08:39:17 +02:00
parent 2a75e7572e
commit ff9c2f39b0
2 changed files with 131 additions and 115 deletions

View File

@@ -1,15 +1,19 @@
"""Ionization chamber device class"""
from __future__ import annotations
from typing import TYPE_CHECKING, Literal
from typeguard import typechecked
import numpy as np
from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Kind
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd.status import DeviceStatus, SubscriptionStatus
from ophyd_devices import CompareStatus, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked
from debye_bec.devices.ionization_chambers.ionization_chamber_enums import (
AmplifierEnable,
@@ -110,18 +114,10 @@ class IonizationChamber0(PSIDeviceBase):
"""
if self.amp.cOnOff.get() == AmplifierEnable.OFF:
status = CompareStatus(self.amp.cOnOff, AmplifierEnable.ON)
self.cancel_on_stop(status)
self.amp.cOnOff.put(AmplifierEnable.ON)
# Wait until channel is switched on
def _wait_enabled():
return self.amp.cOnOff.get() == AmplifierEnable.ON
if not self.wait_for_condition(
_wait_enabled, check_stopped=True, timeout=self.timeout_for_pvwait
):
raise TimeoutError(
f"Enabling channel run into timeout after {self.timeout_for_pvwait} seconds"
)
status.wait(self.timeout_for_pvwait)
match gain:
case "1e6":
@@ -144,21 +140,13 @@ class IonizationChamber0(PSIDeviceBase):
"""Configure the filter setting of the specified channel
Args:
value (Literal['1us', '3us', '10us', '30us', '100us', '300us', '1ms', '3ms']) : Desired filter
value (Literal['1us','3us','10us','30us','100us','300us','1ms','3ms']) :Desired filter
"""
if self.amp.cOnOff.get() == AmplifierEnable.OFF:
status = CompareStatus(self.amp.cOnOff, AmplifierEnable.ON)
self.cancel_on_stop(status)
self.amp.cOnOff.put(AmplifierEnable.ON)
# Wait until channel is switched on
def _wait_enabled():
return self.amp.cOnOff.get() == AmplifierEnable.ON
if not self.wait_for_condition(
_wait_enabled, check_stopped=True, timeout=self.timeout_for_pvwait
):
raise TimeoutError(
f"Enabling channel run into timeout after {self.timeout_for_pvwait} seconds"
)
status.wait(self.timeout_for_pvwait)
match value:
case "1us":
@@ -187,20 +175,16 @@ class IonizationChamber0(PSIDeviceBase):
hv (float) : Desired voltage for the 'HV' terminal. Voltage has to be between 0...3000
"""
if not (0 <= hv <= 3000):
if not 0 <= hv <= 3000:
raise ValueError(f"specified HV {hv} not within range [0 .. 3000]")
if not np.isclose(np.abs(hv - self.hv.grid_v.get()), 0, atol=3):
raise ValueError(f"Grid {self.hv.grid_v.get()} must not be higher than HV {hv}!")
if not self.hv_en.ena.get() == 1:
def check_ch_ena(*, old_value, value, **kwargs):
return value == 1
status = SubscriptionStatus(device=self.hv_en.ena, callback=check_ch_ena)
status = CompareStatus(self.hv_en.ena, 1)
self.cancel_on_stop(status)
self.hv_en.ena.put(1)
# Wait after setting ena to 1
status.wait(timeout=2)
status.wait(self.timeout_for_pvwait)
# Set current fixed to 3 mA (max)
self.hv.hv_i.put(3)
@@ -212,23 +196,20 @@ class IonizationChamber0(PSIDeviceBase):
enable the high voltage (if external enable is active)!
Args:
grid (float) : Desired voltage for the 'Grid' terminal, Grid Voltage has to be between 0...3000
grid (float) : Desired voltage for the 'Grid' terminal,
Grid Voltage has to be between 0...3000
"""
if not (0 <= grid <= 3000):
if not 0 <= grid <= 3000:
raise ValueError(f"specified Grid {grid} not within range [0 .. 3000]")
if not np.isclose(np.abs(grid - self.hv.hv_v.get()), 0, atol=3):
raise ValueError(f"Grid {grid} must not be higher than HV {self.hv.hv_v.get()}!")
if not self.hv_en.ena.get() == 1:
def check_ch_ena(*, old_value, value, **kwargs):
return value == 1
status = SubscriptionStatus(device=self.hv_en.ena, callback=check_ch_ena)
status = CompareStatus(self.hv_en.ena, 1)
self.cancel_on_stop(status)
self.hv_en.ena.put(1)
# Wait after setting ena to 1
status.wait(timeout=2)
status.wait(self.timeout_for_pvwait)
# Set current fixed to 3 mA (max)
self.hv.grid_i.put(3)
@@ -244,7 +225,7 @@ class IonizationChamber0(PSIDeviceBase):
pressure: float,
*,
wait: bool = False,
) -> DeviceStatus:
) -> DeviceStatus | None:
"""Fill an ionization chamber with the specified gas mixture.
Args:
@@ -256,13 +237,13 @@ class IonizationChamber0(PSIDeviceBase):
wait (bool): If you like to wait for the filling to finish.
"""
if not (0 <= conc1 <= 100):
if not 0 <= conc1 <= 100:
raise ValueError(f"Concentration 1 {conc1} out of range [0 .. 100 %]")
if not (0 <= conc2 <= 100):
if not 0 <= conc2 <= 100:
raise ValueError(f"Concentration 2 {conc2} out of range [0 .. 100 %]")
if not np.isclose((conc1 + conc2), 100, atol=0.1):
raise ValueError(f"Conc1 {conc1} and conc2 {conc2} must sum to 100 +- 0.1")
if not (0 <= pressure <= 3):
if not 0 <= pressure <= 3:
raise ValueError(f"Pressure {pressure} out of range [0 .. 3 bar abs]")
self.gmes.gas1_req.set(gas1).wait(timeout=3)
@@ -270,28 +251,13 @@ class IonizationChamber0(PSIDeviceBase):
self.gmes.gas2_req.set(gas2).wait(timeout=3)
self.gmes.conc2_req.set(conc2).wait(timeout=3)
status = TransitionStatus(self.gmes.status.get(), [0, 1])
self.cancel_on_stop(status)
self.gmes.fill.put(1)
def wait_for_status():
return self.gmes.status.get() == 0
timeout = 3
if not self.wait_for_condition(wait_for_status, timeout=timeout, check_stopped=True):
raise TimeoutError(
f"Ionization chamber filling process did not start after {timeout}s. Last log message {self.gmes_status.get()}"
)
def wait_for_filling_finished():
return self.gmes.status.get() == 1
# Wait until ionization chamber is filled successfully
status = self.task_handler.submit_task(
task=self.wait_for_condition, task_args=(wait_for_filling_finished, 360, True)
)
if wait:
status.wait()
return status
status.wait(timeout=360)
else:
return status
class IonizationChamber1(IonizationChamber0):
"""Ionization Chamber 1, prefix should be 'X01DA-'."""

View File

@@ -1,75 +1,125 @@
"""ES2 Pilatus Curtain"""
import time
from __future__ import annotations
import enum
from typing import TYPE_CHECKING
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind
from ophyd_devices.utils import bec_utils
from ophyd import EpicsSignal, EpicsSignalRO
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices import CompareStatus, AndStatusWithList, DeviceStatus
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
class GasMixSetup(Device):
class PilatusCurtainError(Exception):
"""PilatusCurtain specific exception"""
class COVER(int, enum.Enum):
"""Pilatus Curtain States"""
OPEN = 0
CLOSED = 0
ERROR = 1
class PilatusCurtain(PSIDeviceBase):
"""Class for the ES2 Pilatus Curtain"""
USER_ACCESS = ["open", "close"]
open_cover = Cpt(EpicsSignal, suffix="OpenCover", kind="config", doc="Open Cover")
close_cover = Cpt(EpicsSignal, suffix="CloseCover", kind="config", doc="Close Cover")
open_cover = Cpt(
EpicsSignal,
suffix="OpenCover",
kind="config",
doc="Open Cover"
)
close_cover = Cpt(
EpicsSignal,
suffix="CloseCover",
kind="config",
doc="Close Cover"
)
cover_is_closed = Cpt(
EpicsSignalRO, suffix="CoverIsClosed", kind="config", doc="Cover is closed"
EpicsSignalRO,
suffix="CoverIsClosed",
kind="config",
doc="Cover is closed"
)
cover_is_open = Cpt(EpicsSignalRO, suffix="CoverIsOpen", kind="config", doc="Cover is open")
cover_is_open = Cpt(
EpicsSignalRO,
suffix="CoverIsOpen",
kind="config",
doc="Cover is open"
)
cover_is_moving = Cpt(
EpicsSignalRO, suffix="CoverIsMoving", kind="config", doc="Cover is moving"
EpicsSignalRO,
suffix="CoverIsMoving",
kind="config",
doc="Cover is moving"
)
cover_error = Cpt(EpicsSignalRO, suffix="CoverError", kind="config", doc="Cover error")
def __init__(
self, prefix="", *, name: str, kind: Kind = None, device_manager=None, parent=None, **kwargs
):
"""Initialize the Pilatus Curtain.
cover_error = Cpt(
EpicsSignalRO,
suffix="CoverError",
kind="config",
doc="Cover error"
)
Args:
prefix (str): EPICS prefix for the device
name (str): Name of the device
kind (Kind): Kind of the device
device_manager (DeviceManager): Device manager instance
parent (Device): Parent device
kwargs: Additional keyword arguments
"""
super().__init__(prefix, name=name, kind=kind, parent=parent, **kwargs)
self.device_manager = device_manager
self.service_cfg = None
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.timeout_for_pvwait = 30
self.readback.name = self.name
# Wait for connection on all components, ensure IOC is connected
self.wait_for_connection(all_signals=True, timeout=5)
if device_manager:
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
if self.cover_error.get() == COVER.ERROR:
raise PilatusCurtainError('Pilatus Curtain is in an error state!')
self.connector = self.device_manager.connector
def on_stage(self) -> DeviceStatus | None:
"""Called while staging the device."""
return self.open()
def open(self) -> None:
def on_unstage(self) -> DeviceStatus | None:
"""Called while unstaging the device."""
return self.close()
def on_stop(self) -> DeviceStatus | None:
"""Called when the device is stopped."""
return self.close()
def open(self) -> DeviceStatus | None:
"""Open the cover"""
if self.cover_is_closed.get() == COVER.CLOSED:
self.open_cover.put(1)
status_open = CompareStatus(self.cover_is_open, COVER.OPEN, timeout=5)
status_error = CompareStatus(self.cover_error, COVER.ERROR, operation='!=')
status = AndStatusWithList(
device=self, status_list=[status_open, status_error]
)
self.cancel_on_stop(status)
return status
else:
return None
self.open_cover.put(1)
while not self.cover_is_open.get():
time.sleep(0.1)
if self.cover_error.get():
raise TimeoutError("Curtain did not open successfully and is now in an error state")
def close(self) -> None:
def close(self) -> DeviceStatus | None:
"""Close the cover"""
self.close_cover.put(1)
while not self.cover_is_closed.get():
time.sleep(0.1)
if self.cover_error.get():
raise TimeoutError(
"Curtain did not close successfully and is now in an error state"
)
if self.cover_is_open.get() == COVER.OPEN:
self.close_cover.put(1)
status_close = CompareStatus(self.cover_is_closed, COVER.CLOSED, timeout=5)
status_error = CompareStatus(self.cover_error, COVER.ERROR, operation='!=')
status = AndStatusWithList(
device=self, status_list=[status_close, status_error]
)
self.cancel_on_stop(status)
return status
else:
return None