refactor: formatting

This commit is contained in:
gac-x01da
2025-05-07 12:40:13 +02:00
parent 31ff28236b
commit 7b7a24b6c8
8 changed files with 436 additions and 184 deletions

View File

@@ -1,6 +1,6 @@
"""
Pre-startup script for BEC client. This script is executed before the BEC client
is started. It can be used to add additional command line arguments.
is started. It can be used to add additional command line arguments.
"""
from bec_lib.service_config import ServiceConfig

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
from typing import Literal, TYPE_CHECKING
from typing import TYPE_CHECKING, Literal
import numpy as np
from ophyd import Component as Cpt
@@ -17,7 +17,7 @@ from debye_bec.devices.ionization_chambers.ionization_chamber_enums import (
AmplifierGain,
)
if TYPE_CHECKING: #pragma: no cover
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
@@ -63,7 +63,7 @@ class HighVoltageSuppliesControl(Device):
class IonizationChamber0(PSIDeviceBase):
"""Ionization Chamber 0, prefix should be 'X01DA-'."""
USER_ACCESS = ['set_gain', 'set_filter', 'set_hv', 'set_grid', 'fill']
USER_ACCESS = ["set_gain", "set_filter", "set_hv", "set_grid", "fill"]
num = 1
amp_signals = {
@@ -85,9 +85,7 @@ class IonizationChamber0(PSIDeviceBase):
}
amp = Dcpt(amp_signals)
gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}")
gmes_status = Cpt(
EpicsSignalRO, suffix="ES-GMES:StatusMsg0", kind="config", doc='Status'
)
gmes_status = Cpt(EpicsSignalRO, suffix="ES-GMES:StatusMsg0", kind="config", doc="Status")
hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES1-IC{num-1}:")
hv_en_signals = {
"ext_ena": (
@@ -191,7 +189,7 @@ class IonizationChamber0(PSIDeviceBase):
if not 0 <= hv <= 3000:
raise ValueError(f"specified HV {hv} not within range [0 .. 3000]")
if not np.isclose(np.abs(hv - self.hv.grid_v.get()), 0, atol=3):
if not np.isclose(np.abs(hv - self.hv.grid_v.get()), 0, atol=3):
raise ValueError(f"Grid {self.hv.grid_v.get()} must not be higher than HV {hv}!")
if not self.hv_en.ena.get() == 1:
@@ -219,7 +217,7 @@ class IonizationChamber0(PSIDeviceBase):
if not 0 <= grid <= 3000:
raise ValueError(f"specified Grid {grid} not within range [0 .. 3000]")
if not np.isclose(np.abs(grid - self.hv.hv_v.get()), 0, atol=3):
if not np.isclose(np.abs(grid - self.hv.hv_v.get()), 0, atol=3):
raise ValueError(f"Grid {grid} must not be higher than HV {self.hv.hv_v.get()}!")
if not self.hv_en.ena.get() == 1:
@@ -318,9 +316,7 @@ class IonizationChamber1(IonizationChamber0):
}
amp = Dcpt(amp_signals)
gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}")
gmes_status = Cpt(
EpicsSignalRO, suffix="ES-GMES:StatusMsg0", kind="config", doc='Status'
)
gmes_status = Cpt(EpicsSignalRO, suffix="ES-GMES:StatusMsg0", kind="config", doc="Status")
hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES2-IC{num-1}:")
hv_en_signals = {
"ext_ena": (
@@ -356,9 +352,7 @@ class IonizationChamber2(IonizationChamber0):
}
amp = Dcpt(amp_signals)
gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}")
gmes_status = Cpt(
EpicsSignalRO, suffix="ES-GMES:StatusMsg0", kind="config", doc='Status'
)
gmes_status = Cpt(EpicsSignalRO, suffix="ES-GMES:StatusMsg0", kind="config", doc="Status")
hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES2-IC{num-1}:")
hv_en_signals = {
"ext_ena": (

View File

@@ -14,7 +14,7 @@ from typing import Any, Literal
from bec_lib.devicemanager import ScanInfo
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, StatusBase, Signal
from ophyd import DeviceStatus, Signal, StatusBase
from ophyd.status import SubscriptionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError
@@ -201,7 +201,11 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
# Load the scan parameters to the controller
self.scan_control.scan_load.put(1)
# Wait for params to be checked from controller
self.wait_for_signal(self.scan_control.scan_msg, ScanControlLoadMessage.SUCCESS, timeout=self.timeout_for_pvwait)
self.wait_for_signal(
self.scan_control.scan_msg,
ScanControlLoadMessage.SUCCESS,
timeout=self.timeout_for_pvwait,
)
return None
def on_unstage(self) -> DeviceStatus | StatusBase | None:
@@ -231,6 +235,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
if value == ScanControlLoadMessage.PENDING:
return True
return False
status = SubscriptionStatus(self.scan_control.scan_msg, callback=callback)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
@@ -269,11 +274,12 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
if scan_duration < 0.1
else self.scan_control.scan_start_timer.put
)
def callback(*, old_value, value, **kwargs):
if old_value == ScanControlScanStatus.READY and value == ScanControlScanStatus.RUNNING:
return True
return False
status = SubscriptionStatus(self.scan_control.scan_status, callback=callback)
start_func(1)
return status
@@ -326,7 +332,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
while time.time() - start_time < timeout:
if signal.get() == value:
return None
if self.stopped is True: # Should this check be optional or configurable?!
if self.stopped is True: # Should this check be optional or configurable?!
raise DeviceStopError(f"Device {self.name} was stopped while waiting for signal")
time.sleep(0.1)
# If we end up here, the status did not resolve
@@ -448,7 +454,6 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
def _check_scan_msg(self, target_state: ScanControlLoadMessage) -> None:
"""Check if the scan message is gettting available
@@ -474,7 +479,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
status = SubscriptionStatus(self.scan_control.scan_msg, callback=callback)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
# try:
@@ -484,4 +489,3 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
# f"Timeout after {self.timeout_for_pvwait} while waiting for scan status,"
# f" current state: {ScanControlScanStatus(self.scan_control.scan_msg.get())}"
# ) from exc

View File

@@ -1,4 +1,4 @@
""" Module for additional utils of the Mo1 Bragg Positioner"""
"""Module for additional utils of the Mo1 Bragg Positioner"""
import numpy as np
from scipy.interpolate import BSpline

View File

@@ -1,18 +1,20 @@
from __future__ import annotations
from typing import Literal, TYPE_CHECKING, cast
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd import Device, Kind, DeviceStatus, Component as Cpt
from ophyd import EpicsSignal, EpicsSignalRO, StatusBase
from ophyd_devices.sim.sim_signals import SetableSignal
from typing import TYPE_CHECKING, Literal, cast
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.sim.sim_signals import SetableSignal
from debye_bec.devices.nidaq.nidaq_enums import (
NIDAQCompression,
ScanType,
NidaqState,
ScanRates,
ReadoutRange,
EncoderTypes,
NIDAQCompression,
NidaqState,
ReadoutRange,
ScanRates,
ScanType,
)
if TYPE_CHECKING: # pragma: no cover
@@ -29,69 +31,265 @@ class NidaqControl(Device):
"""Nidaq control class with all PVs"""
### Readback PVs for EpicsEmitter ###
ai0 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI0", kind=Kind.normal, doc="EPICS analog input 0",auto_monitor=True)
ai1 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI1", kind=Kind.normal, doc="EPICS analog input 1",auto_monitor=True)
ai2 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI2", kind=Kind.normal, doc="EPICS analog input 2",auto_monitor=True)
ai3 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI3", kind=Kind.normal, doc="EPICS analog input 3",auto_monitor=True)
ai4 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI4", kind=Kind.normal, doc="EPICS analog input 4",auto_monitor=True)
ai5 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI5", kind=Kind.normal, doc="EPICS analog input 5",auto_monitor=True)
ai6 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI6", kind=Kind.normal, doc="EPICS analog input 6",auto_monitor=True)
ai7 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI7", kind=Kind.normal, doc="EPICS analog input 7",auto_monitor=True)
ai0 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI0",
kind=Kind.normal,
doc="EPICS analog input 0",
auto_monitor=True,
)
ai1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI1",
kind=Kind.normal,
doc="EPICS analog input 1",
auto_monitor=True,
)
ai2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI2",
kind=Kind.normal,
doc="EPICS analog input 2",
auto_monitor=True,
)
ai3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI3",
kind=Kind.normal,
doc="EPICS analog input 3",
auto_monitor=True,
)
ai4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI4",
kind=Kind.normal,
doc="EPICS analog input 4",
auto_monitor=True,
)
ai5 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI5",
kind=Kind.normal,
doc="EPICS analog input 5",
auto_monitor=True,
)
ai6 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI6",
kind=Kind.normal,
doc="EPICS analog input 6",
auto_monitor=True,
)
ai7 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI7",
kind=Kind.normal,
doc="EPICS analog input 7",
auto_monitor=True,
)
ci0 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI0", kind=Kind.normal, doc="EPICS counter input 0", auto_monitor=True)
ci1 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI1", kind=Kind.normal, doc="EPICS counter input 1", auto_monitor=True)
ci2 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI2", kind=Kind.normal, doc="EPICS counter input 2", auto_monitor=True)
ci3 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI3", kind=Kind.normal, doc="EPICS counter input 3", auto_monitor=True)
ci4 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI4", kind=Kind.normal, doc="EPICS counter input 4", auto_monitor=True)
ci5 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI5", kind=Kind.normal, doc="EPICS counter input 5", auto_monitor=True)
ci6 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI6", kind=Kind.normal, doc="EPICS counter input 6", auto_monitor=True)
ci7 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI7", kind=Kind.normal, doc="EPICS counter input 7", auto_monitor=True)
ci0 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI0",
kind=Kind.normal,
doc="EPICS counter input 0",
auto_monitor=True,
)
ci1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI1",
kind=Kind.normal,
doc="EPICS counter input 1",
auto_monitor=True,
)
ci2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI2",
kind=Kind.normal,
doc="EPICS counter input 2",
auto_monitor=True,
)
ci3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI3",
kind=Kind.normal,
doc="EPICS counter input 3",
auto_monitor=True,
)
ci4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI4",
kind=Kind.normal,
doc="EPICS counter input 4",
auto_monitor=True,
)
ci5 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI5",
kind=Kind.normal,
doc="EPICS counter input 5",
auto_monitor=True,
)
ci6 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI6",
kind=Kind.normal,
doc="EPICS counter input 6",
auto_monitor=True,
)
ci7 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI7",
kind=Kind.normal,
doc="EPICS counter input 7",
auto_monitor=True,
)
di0 = Cpt(EpicsSignalRO, suffix="NIDAQ-DI0", kind=Kind.normal, doc="EPICS digital input 0", auto_monitor=True)
di1 = Cpt(EpicsSignalRO, suffix="NIDAQ-DI1", kind=Kind.normal, doc="EPICS digital input 1", auto_monitor=True)
di2 = Cpt(EpicsSignalRO, suffix="NIDAQ-DI2", kind=Kind.normal, doc="EPICS digital input 2", auto_monitor=True)
di3 = Cpt(EpicsSignalRO, suffix="NIDAQ-DI3", kind=Kind.normal, doc="EPICS digital input 3", auto_monitor=True)
di4 = Cpt(EpicsSignalRO, suffix="NIDAQ-DI4", kind=Kind.normal, doc="EPICS digital input 4", auto_monitor=True)
di0 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI0",
kind=Kind.normal,
doc="EPICS digital input 0",
auto_monitor=True,
)
di1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI1",
kind=Kind.normal,
doc="EPICS digital input 1",
auto_monitor=True,
)
di2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI2",
kind=Kind.normal,
doc="EPICS digital input 2",
auto_monitor=True,
)
di3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI3",
kind=Kind.normal,
doc="EPICS digital input 3",
auto_monitor=True,
)
di4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI4",
kind=Kind.normal,
doc="EPICS digital input 4",
auto_monitor=True,
)
enc_epics = Cpt(
EpicsSignalRO,
suffix="NIDAQ-ENC",
kind=Kind.normal,
doc="EPICS Encoder reading",
auto_monitor=True,
)
enc_epics = Cpt(EpicsSignalRO, suffix="NIDAQ-ENC", kind=Kind.normal, doc="EPICS Encoder reading", auto_monitor=True)
### Readback for BEC emitter ###
ai0_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, MEAN")
ai1_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, MEAN")
ai2_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, MEAN")
ai3_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, MEAN")
ai4_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, MEAN")
ai5_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, MEAN")
ai6_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, MEAN")
ai7_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, MEAN")
ai0_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, MEAN"
)
ai1_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, MEAN"
)
ai2_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, MEAN"
)
ai3_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, MEAN"
)
ai4_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, MEAN"
)
ai5_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, MEAN"
)
ai6_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, MEAN"
)
ai7_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, MEAN"
)
ai0_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, STD")
ai1_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, STD")
ai2_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, STD")
ai3_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, STD")
ai4_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, STD")
ai5_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, STD")
ai6_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, STD")
ai7_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, STD")
ai0_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, STD"
)
ai1_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, STD"
)
ai2_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, STD"
)
ai3_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, STD"
)
ai4_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, STD"
)
ai5_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, STD"
)
ai6_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, STD"
)
ai7_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, STD"
)
ci0_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0, MEAN")
ci1_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1, MEAN")
ci2_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2, MEAN")
ci3_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3, MEAN")
ci4_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4, MEAN")
ci5_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5, MEAN")
ci6_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6, MEAN")
ci7_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7, MEAN")
ci0_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0, MEAN"
)
ci1_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1, MEAN"
)
ci2_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2, MEAN"
)
ci3_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3, MEAN"
)
ci4_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4, MEAN"
)
ci5_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5, MEAN"
)
ci6_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6, MEAN"
)
ci7_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7, MEAN"
)
ci0_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0. STD")
ci1_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1. STD")
ci2_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2. STD")
ci3_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3. STD")
ci4_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4. STD")
ci5_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5. STD")
ci6_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6. STD")
ci7_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7. STD")
ci0_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0. STD"
)
ci1_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1. STD"
)
ci2_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2. STD"
)
ci3_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3. STD"
)
ci4_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4. STD"
)
ci5_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5. STD"
)
ci6_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6. STD"
)
ci7_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7. STD"
)
di0_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 0, MAX")
di1_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 1, MAX")
@@ -135,7 +333,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.timeout_wait_for_signal = 5 # put 5s firsts
self._timeout_wait_for_pv = 3 # 3s timeout for pv calls
self._timeout_wait_for_pv = 3 # 3s timeout for pv calls
self.valid_scan_names = [
"xas_simple_scan",
"xas_simple_scan_with_xrd",
@@ -275,10 +473,13 @@ class Nidaq(PSIDeviceBase, NidaqControl):
Default values for signals should be set here.
"""
if not self.wait_for_condition(
condition= lambda: self.state.get() == NidaqState.STANDBY,
timeout = self.timeout_wait_for_signal,
check_stopped=True):
raise NidaqError(f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}")
condition=lambda: self.state.get() == NidaqState.STANDBY,
timeout=self.timeout_wait_for_signal,
check_stopped=True,
):
raise NidaqError(
f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}"
)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
def on_stage(self) -> DeviceStatus | StatusBase | None:
@@ -292,22 +493,26 @@ class Nidaq(PSIDeviceBase, NidaqControl):
return None
if not self.wait_for_condition(
condition=lambda: self.state.get() == NidaqState.STANDBY, timeout=self.timeout_wait_for_signal, check_stopped=True
condition=lambda: self.state.get() == NidaqState.STANDBY,
timeout=self.timeout_wait_for_signal,
check_stopped=True,
):
raise NidaqError(
f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}"
)
self.scan_type.set(ScanType.TRIGGERED).wait(timeout = self._timeout_wait_for_pv)
self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
self.stage_call.set(1).wait(timeout = self._timeout_wait_for_pv)
self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv)
if not self.wait_for_condition(
condition=lambda: self.state.get() == NidaqState.STAGE, timeout=self.timeout_wait_for_signal, check_stopped=True
condition=lambda: self.state.get() == NidaqState.STAGE,
timeout=self.timeout_wait_for_signal,
check_stopped=True,
):
raise NidaqError(
f"Device {self.name} has not been reached in state STAGE, current state {NidaqState(self.state.get())}"
)
self.kickoff_call.set(1).wait(timeout = self._timeout_wait_for_pv)
self.kickoff_call.set(1).wait(timeout=self._timeout_wait_for_pv)
logger.info(f"Device {self.name} was staged: {NidaqState(self.state.get())}")
def on_unstage(self) -> DeviceStatus | StatusBase | None:
@@ -361,10 +566,10 @@ class Nidaq(PSIDeviceBase, NidaqControl):
if not self._check_if_scan_name_is_valid():
return None
self.on_stop()
#TODO check if this wait can be removed. We are waiting in unstage anyways which will always be called afterwards
# TODO check if this wait can be removed. We are waiting in unstage anyways which will always be called afterwards
# Wait for device to be stopped
status = self.wait_for_condition(
condition = lambda: self.state.get() == NidaqState.STANDBY,
condition=lambda: self.state.get() == NidaqState.STANDBY,
check_stopped=True,
timeout=self.timeout_wait_for_signal,
)

View File

@@ -2,17 +2,22 @@ import enum
class NIDAQCompression(str, enum.Enum):
""" Options for Compression"""
"""Options for Compression"""
OFF = 0
ON = 1
class ScanType(int, enum.Enum):
""" Triggering options of the backend"""
"""Triggering options of the backend"""
TRIGGERED = 0
CONTINUOUS = 1
class NidaqState(int, enum.Enum):
""" Possible States of the NIDAQ backend"""
"""Possible States of the NIDAQ backend"""
DISABLED = 0
STANDBY = 1
STAGE = 2
@@ -20,8 +25,10 @@ class NidaqState(int, enum.Enum):
ACQUIRE = 4
UNSTAGE = 5
class ScanRates(int, enum.Enum):
""" Sampling Rate options for the backend, in kHZ and MHz"""
"""Sampling Rate options for the backend, in kHZ and MHz"""
HUNDRED_KHZ = 0
FIVE_HUNDRED_KHZ = 1
ONE_MHZ = 2
@@ -31,15 +38,19 @@ class ScanRates(int, enum.Enum):
TEN_MHZ = 6
FOURTEEN_THREE_MHZ = 7
class ReadoutRange(int, enum.Enum):
"""ReadoutRange in +-V"""
ONE_V = 0
TWO_V = 1
FIVE_V = 2
TEN_V = 3
class EncoderTypes(int, enum.Enum):
""" Encoder Types"""
"""Encoder Types"""
X_1 = 0
X_2 = 1
X_4 = 2

View File

@@ -1,17 +1,20 @@
""" ES2 Reference Foil Changer"""
"""ES2 Reference Foil Changer"""
from __future__ import annotations
import enum
from typing import TYPE_CHECKING
from ophyd import Component as Cpt
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd.status import DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
class Status(int, enum.Enum):
"""Enum class for the status field"""
@@ -21,6 +24,7 @@ class Status(int, enum.Enum):
MOVING = 3
ERROR = 4
class OpMode(int, enum.Enum):
"""Enum class for the Operating Mode field"""
@@ -29,95 +33,122 @@ class OpMode(int, enum.Enum):
DIAGNOSTICMODE = 2
ERRORMODE = 3
class Reffoilchanger(PSIDeviceBase):
"""Class for the ES2 Reference Foil Changer"""
USER_ACCESS = ['insert']
USER_ACCESS = ["insert"]
inserted = Cpt(
EpicsSignalRO, suffix="ES2-REF:TRY-FilterInserted", kind="config", doc='Inserted indicator'
EpicsSignalRO, suffix="ES2-REF:TRY-FilterInserted", kind="config", doc="Inserted indicator"
)
retracted = Cpt(
EpicsSignalRO, suffix="ES2-REF:TRY-FilterRetracted", kind="config", doc='Retracted indicator'
)
moving = Cpt(
EpicsSignalRO, suffix="ES2-REF:ROTY.MOVN", kind="config", doc='Moving indicator'
EpicsSignalRO,
suffix="ES2-REF:TRY-FilterRetracted",
kind="config",
doc="Retracted indicator",
)
moving = Cpt(EpicsSignalRO, suffix="ES2-REF:ROTY.MOVN", kind="config", doc="Moving indicator")
status = Cpt(
EpicsSignal, suffix="ES2-REF:SELN-FilterState-ENUM_RBV", kind="config", doc='Status'
EpicsSignal, suffix="ES2-REF:SELN-FilterState-ENUM_RBV", kind="config", doc="Status"
)
op_mode = Cpt(
EpicsSignalWithRBV, suffix="ES2-REF:SELN-OpMode-ENUM", kind="config", doc='Status'
)
ref_set = Cpt(
EpicsSignal, suffix="ES2-REF:SELN-SET", kind="config", doc='Requested reference'
EpicsSignalWithRBV, suffix="ES2-REF:SELN-OpMode-ENUM", kind="config", doc="Status"
)
ref_set = Cpt(EpicsSignal, suffix="ES2-REF:SELN-SET", kind="config", doc="Requested reference")
ref_rb = Cpt(
EpicsSignalRO, suffix="ES2-REF:SELN-RB", kind="config", doc='Currently set reference'
EpicsSignalRO, suffix="ES2-REF:SELN-RB", kind="config", doc="Currently set reference"
)
foil01 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL01.DESC", kind="config", doc='Foil 01')
foil02 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL02.DESC", kind="config", doc='Foil 02')
foil03 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL03.DESC", kind="config", doc='Foil 03')
foil04 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL04.DESC", kind="config", doc='Foil 04')
foil05 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL05.DESC", kind="config", doc='Foil 05')
foil06 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL06.DESC", kind="config", doc='Foil 06')
foil07 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL07.DESC", kind="config", doc='Foil 07')
foil08 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL08.DESC", kind="config", doc='Foil 08')
foil09 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL09.DESC", kind="config", doc='Foil 09')
foil10 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL10.DESC", kind="config", doc='Foil 10')
foil11 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL11.DESC", kind="config", doc='Foil 11')
foil12 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL12.DESC", kind="config", doc='Foil 12')
foil13 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL13.DESC", kind="config", doc='Foil 13')
foil14 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL14.DESC", kind="config", doc='Foil 14')
foil15 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL15.DESC", kind="config", doc='Foil 15')
foil16 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL16.DESC", kind="config", doc='Foil 16')
foil17 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL17.DESC", kind="config", doc='Foil 17')
foil18 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL18.DESC", kind="config", doc='Foil 18')
foil19 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL19.DESC", kind="config", doc='Foil 19')
foil20 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL20.DESC", kind="config", doc='Foil 20')
foil21 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL21.DESC", kind="config", doc='Foil 21')
foil22 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL22.DESC", kind="config", doc='Foil 22')
foil23 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL23.DESC", kind="config", doc='Foil 23')
foil24 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL24.DESC", kind="config", doc='Foil 24')
foil25 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL25.DESC", kind="config", doc='Foil 25')
foil26 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL26.DESC", kind="config", doc='Foil 26')
foil27 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL27.DESC", kind="config", doc='Foil 27')
foil28 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL28.DESC", kind="config", doc='Foil 28')
foil29 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL29.DESC", kind="config", doc='Foil 29')
foil30 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL30.DESC", kind="config", doc='Foil 30')
foil31 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL31.DESC", kind="config", doc='Foil 31')
foil32 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL32.DESC", kind="config", doc='Foil 32')
foil33 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL33.DESC", kind="config", doc='Foil 33')
foil34 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL34.DESC", kind="config", doc='Foil 34')
foil35 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL35.DESC", kind="config", doc='Foil 35')
foil36 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL36.DESC", kind="config", doc='Foil 36')
foil37 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL37.DESC", kind="config", doc='Foil 37')
foil38 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL38.DESC", kind="config", doc='Foil 38')
foil01 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL01.DESC", kind="config", doc="Foil 01")
foil02 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL02.DESC", kind="config", doc="Foil 02")
foil03 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL03.DESC", kind="config", doc="Foil 03")
foil04 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL04.DESC", kind="config", doc="Foil 04")
foil05 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL05.DESC", kind="config", doc="Foil 05")
foil06 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL06.DESC", kind="config", doc="Foil 06")
foil07 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL07.DESC", kind="config", doc="Foil 07")
foil08 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL08.DESC", kind="config", doc="Foil 08")
foil09 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL09.DESC", kind="config", doc="Foil 09")
foil10 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL10.DESC", kind="config", doc="Foil 10")
foil11 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL11.DESC", kind="config", doc="Foil 11")
foil12 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL12.DESC", kind="config", doc="Foil 12")
foil13 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL13.DESC", kind="config", doc="Foil 13")
foil14 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL14.DESC", kind="config", doc="Foil 14")
foil15 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL15.DESC", kind="config", doc="Foil 15")
foil16 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL16.DESC", kind="config", doc="Foil 16")
foil17 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL17.DESC", kind="config", doc="Foil 17")
foil18 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL18.DESC", kind="config", doc="Foil 18")
foil19 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL19.DESC", kind="config", doc="Foil 19")
foil20 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL20.DESC", kind="config", doc="Foil 20")
foil21 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL21.DESC", kind="config", doc="Foil 21")
foil22 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL22.DESC", kind="config", doc="Foil 22")
foil23 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL23.DESC", kind="config", doc="Foil 23")
foil24 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL24.DESC", kind="config", doc="Foil 24")
foil25 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL25.DESC", kind="config", doc="Foil 25")
foil26 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL26.DESC", kind="config", doc="Foil 26")
foil27 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL27.DESC", kind="config", doc="Foil 27")
foil28 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL28.DESC", kind="config", doc="Foil 28")
foil29 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL29.DESC", kind="config", doc="Foil 29")
foil30 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL30.DESC", kind="config", doc="Foil 30")
foil31 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL31.DESC", kind="config", doc="Foil 31")
foil32 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL32.DESC", kind="config", doc="Foil 32")
foil33 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL33.DESC", kind="config", doc="Foil 33")
foil34 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL34.DESC", kind="config", doc="Foil 34")
foil35 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL35.DESC", kind="config", doc="Foil 35")
foil36 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL36.DESC", kind="config", doc="Foil 36")
foil37 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL37.DESC", kind="config", doc="Foil 37")
foil38 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL38.DESC", kind="config", doc="Foil 38")
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.foils = [
self.foil01, self.foil02, self.foil03, self.foil04, self.foil05, self.foil06,
self.foil07, self.foil08, self.foil09, self.foil10, self.foil11, self.foil12,
self.foil13, self.foil14, self.foil15, self.foil16, self.foil17, self.foil18,
self.foil19, self.foil20, self.foil21, self.foil22, self.foil23, self.foil24,
self.foil25, self.foil26, self.foil27, self.foil28, self.foil29, self.foil30,
self.foil31, self.foil32, self.foil33, self.foil34, self.foil35, self.foil36,
self.foil37, self.foil38
self.foil01,
self.foil02,
self.foil03,
self.foil04,
self.foil05,
self.foil06,
self.foil07,
self.foil08,
self.foil09,
self.foil10,
self.foil11,
self.foil12,
self.foil13,
self.foil14,
self.foil15,
self.foil16,
self.foil17,
self.foil18,
self.foil19,
self.foil20,
self.foil21,
self.foil22,
self.foil23,
self.foil24,
self.foil25,
self.foil26,
self.foil27,
self.foil28,
self.foil29,
self.foil30,
self.foil31,
self.foil32,
self.foil33,
self.foil34,
self.foil35,
self.foil36,
self.foil37,
self.foil38,
]
def insert(
self,
ref:str,
wait:bool = False
) -> DeviceStatus:
def insert(self, ref: str, wait: bool = False) -> DeviceStatus:
"""Insert a reference
Args:
ref (str) : Desired reference foil name, e.g. Fe or Pt
wait (bool): If you like to wait for the filling to finish. Default False.
Args:
ref (str) : Desired reference foil name, e.g. Fe or Pt
wait (bool): If you like to wait for the filling to finish. Default False.
"""
filter_number = -1
@@ -126,36 +157,38 @@ class Reffoilchanger(PSIDeviceBase):
filter_number = i + 1
break
if filter_number == -1:
raise ValueError(f'Requested foil ({ref}) is not in list of available foils')
raise ValueError(f"Requested foil ({ref}) is not in list of available foils")
if self.op_mode.get() == OpMode.USERMODE:
self.ref_set.put(filter_number)
def wait_for_status():
return (
(self.status.get() == Status.RETRACTED) or
(self.status.get() == Status.MOVING) or
(
self.ref_rb.get() < (filter_number + 0.2) and
self.ref_rb.get() > (filter_number - 0.2)
(self.status.get() == Status.RETRACTED)
or (self.status.get() == Status.MOVING)
or (
self.ref_rb.get() < (filter_number + 0.2)
and self.ref_rb.get() > (filter_number - 0.2)
)
)
timeout = 3
if not self.wait_for_condition(wait_for_status, timeout=timeout, check_stopped=True):
raise TimeoutError(
f"Reference foil changer did not retract the current foil within {timeout}s"
)
def wait_for_change_finished():
return self.status.get() == Status.INSERTED and self.op_mode == OpMode.USERMODE
# Wait until new reference foil is inserted
status = self.task_handler.submit_task(
task=self.wait_for_condition,
task_args=(wait_for_change_finished, 5, True))
task=self.wait_for_condition, task_args=(wait_for_change_finished, 5, True)
)
if wait:
status.wait()
return status
else:
raise DeviceStopError(
f'Reference foil changer must be in User Mode but is in {self.op_mode.get(as_string=True)}'
f"Reference foil changer must be in User Mode but is in {self.op_mode.get(as_string=True)}"
)

View File

@@ -1 +1,6 @@
from .mono_bragg_scans import XASSimpleScan, XASSimpleScanWithXRD, XASAdvancedScan, XASAdvancedScanWithXRD
from .mono_bragg_scans import (
XASAdvancedScan,
XASAdvancedScanWithXRD,
XASSimpleScan,
XASSimpleScanWithXRD,
)