refactor: formatting

This commit is contained in:
gac-x01da
2025-05-07 12:40:13 +02:00
parent 31ff28236b
commit 7b7a24b6c8
8 changed files with 436 additions and 184 deletions

View File

@@ -1,6 +1,6 @@
""" """
Pre-startup script for BEC client. This script is executed before the BEC client Pre-startup script for BEC client. This script is executed before the BEC client
is started. It can be used to add additional command line arguments. is started. It can be used to add additional command line arguments.
""" """
from bec_lib.service_config import ServiceConfig from bec_lib.service_config import ServiceConfig

View File

@@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
from typing import Literal, TYPE_CHECKING from typing import TYPE_CHECKING, Literal
import numpy as np import numpy as np
from ophyd import Component as Cpt from ophyd import Component as Cpt
@@ -17,7 +17,7 @@ from debye_bec.devices.ionization_chambers.ionization_chamber_enums import (
AmplifierGain, AmplifierGain,
) )
if TYPE_CHECKING: #pragma: no cover if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo from bec_lib.devicemanager import ScanInfo
@@ -63,7 +63,7 @@ class HighVoltageSuppliesControl(Device):
class IonizationChamber0(PSIDeviceBase): class IonizationChamber0(PSIDeviceBase):
"""Ionization Chamber 0, prefix should be 'X01DA-'.""" """Ionization Chamber 0, prefix should be 'X01DA-'."""
USER_ACCESS = ['set_gain', 'set_filter', 'set_hv', 'set_grid', 'fill'] USER_ACCESS = ["set_gain", "set_filter", "set_hv", "set_grid", "fill"]
num = 1 num = 1
amp_signals = { amp_signals = {
@@ -85,9 +85,7 @@ class IonizationChamber0(PSIDeviceBase):
} }
amp = Dcpt(amp_signals) amp = Dcpt(amp_signals)
gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}") gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}")
gmes_status = Cpt( gmes_status = Cpt(EpicsSignalRO, suffix="ES-GMES:StatusMsg0", kind="config", doc="Status")
EpicsSignalRO, suffix="ES-GMES:StatusMsg0", kind="config", doc='Status'
)
hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES1-IC{num-1}:") hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES1-IC{num-1}:")
hv_en_signals = { hv_en_signals = {
"ext_ena": ( "ext_ena": (
@@ -191,7 +189,7 @@ class IonizationChamber0(PSIDeviceBase):
if not 0 <= hv <= 3000: if not 0 <= hv <= 3000:
raise ValueError(f"specified HV {hv} not within range [0 .. 3000]") raise ValueError(f"specified HV {hv} not within range [0 .. 3000]")
if not np.isclose(np.abs(hv - self.hv.grid_v.get()), 0, atol=3): if not np.isclose(np.abs(hv - self.hv.grid_v.get()), 0, atol=3):
raise ValueError(f"Grid {self.hv.grid_v.get()} must not be higher than HV {hv}!") raise ValueError(f"Grid {self.hv.grid_v.get()} must not be higher than HV {hv}!")
if not self.hv_en.ena.get() == 1: if not self.hv_en.ena.get() == 1:
@@ -219,7 +217,7 @@ class IonizationChamber0(PSIDeviceBase):
if not 0 <= grid <= 3000: if not 0 <= grid <= 3000:
raise ValueError(f"specified Grid {grid} not within range [0 .. 3000]") raise ValueError(f"specified Grid {grid} not within range [0 .. 3000]")
if not np.isclose(np.abs(grid - self.hv.hv_v.get()), 0, atol=3): if not np.isclose(np.abs(grid - self.hv.hv_v.get()), 0, atol=3):
raise ValueError(f"Grid {grid} must not be higher than HV {self.hv.hv_v.get()}!") raise ValueError(f"Grid {grid} must not be higher than HV {self.hv.hv_v.get()}!")
if not self.hv_en.ena.get() == 1: if not self.hv_en.ena.get() == 1:
@@ -318,9 +316,7 @@ class IonizationChamber1(IonizationChamber0):
} }
amp = Dcpt(amp_signals) amp = Dcpt(amp_signals)
gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}") gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}")
gmes_status = Cpt( gmes_status = Cpt(EpicsSignalRO, suffix="ES-GMES:StatusMsg0", kind="config", doc="Status")
EpicsSignalRO, suffix="ES-GMES:StatusMsg0", kind="config", doc='Status'
)
hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES2-IC{num-1}:") hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES2-IC{num-1}:")
hv_en_signals = { hv_en_signals = {
"ext_ena": ( "ext_ena": (
@@ -356,9 +352,7 @@ class IonizationChamber2(IonizationChamber0):
} }
amp = Dcpt(amp_signals) amp = Dcpt(amp_signals)
gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}") gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}")
gmes_status = Cpt( gmes_status = Cpt(EpicsSignalRO, suffix="ES-GMES:StatusMsg0", kind="config", doc="Status")
EpicsSignalRO, suffix="ES-GMES:StatusMsg0", kind="config", doc='Status'
)
hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES2-IC{num-1}:") hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES2-IC{num-1}:")
hv_en_signals = { hv_en_signals = {
"ext_ena": ( "ext_ena": (

View File

@@ -14,7 +14,7 @@ from typing import Any, Literal
from bec_lib.devicemanager import ScanInfo from bec_lib.devicemanager import ScanInfo
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import DeviceStatus, StatusBase, Signal from ophyd import DeviceStatus, Signal, StatusBase
from ophyd.status import SubscriptionStatus from ophyd.status import SubscriptionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError from ophyd_devices.utils.errors import DeviceStopError
@@ -201,7 +201,11 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
# Load the scan parameters to the controller # Load the scan parameters to the controller
self.scan_control.scan_load.put(1) self.scan_control.scan_load.put(1)
# Wait for params to be checked from controller # Wait for params to be checked from controller
self.wait_for_signal(self.scan_control.scan_msg, ScanControlLoadMessage.SUCCESS, timeout=self.timeout_for_pvwait) self.wait_for_signal(
self.scan_control.scan_msg,
ScanControlLoadMessage.SUCCESS,
timeout=self.timeout_for_pvwait,
)
return None return None
def on_unstage(self) -> DeviceStatus | StatusBase | None: def on_unstage(self) -> DeviceStatus | StatusBase | None:
@@ -231,6 +235,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
if value == ScanControlLoadMessage.PENDING: if value == ScanControlLoadMessage.PENDING:
return True return True
return False return False
status = SubscriptionStatus(self.scan_control.scan_msg, callback=callback) status = SubscriptionStatus(self.scan_control.scan_msg, callback=callback)
self.scan_control.scan_val_reset.put(1) self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait) status.wait(timeout=self.timeout_for_pvwait)
@@ -269,11 +274,12 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
if scan_duration < 0.1 if scan_duration < 0.1
else self.scan_control.scan_start_timer.put else self.scan_control.scan_start_timer.put
) )
def callback(*, old_value, value, **kwargs): def callback(*, old_value, value, **kwargs):
if old_value == ScanControlScanStatus.READY and value == ScanControlScanStatus.RUNNING: if old_value == ScanControlScanStatus.READY and value == ScanControlScanStatus.RUNNING:
return True return True
return False return False
status = SubscriptionStatus(self.scan_control.scan_status, callback=callback) status = SubscriptionStatus(self.scan_control.scan_status, callback=callback)
start_func(1) start_func(1)
return status return status
@@ -326,7 +332,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
while time.time() - start_time < timeout: while time.time() - start_time < timeout:
if signal.get() == value: if signal.get() == value:
return None return None
if self.stopped is True: # Should this check be optional or configurable?! if self.stopped is True: # Should this check be optional or configurable?!
raise DeviceStopError(f"Device {self.name} was stopped while waiting for signal") raise DeviceStopError(f"Device {self.name} was stopped while waiting for signal")
time.sleep(0.1) time.sleep(0.1)
# If we end up here, the status did not resolve # If we end up here, the status did not resolve
@@ -448,7 +454,6 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
if hasattr(self.scan_parameter, key): if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value) setattr(self.scan_parameter, key, value)
def _check_scan_msg(self, target_state: ScanControlLoadMessage) -> None: def _check_scan_msg(self, target_state: ScanControlLoadMessage) -> None:
"""Check if the scan message is gettting available """Check if the scan message is gettting available
@@ -474,7 +479,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
status = SubscriptionStatus(self.scan_control.scan_msg, callback=callback) status = SubscriptionStatus(self.scan_control.scan_msg, callback=callback)
self.scan_control.scan_val_reset.put(1) self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait) status.wait(timeout=self.timeout_for_pvwait)
# try: # try:
@@ -484,4 +489,3 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
# f"Timeout after {self.timeout_for_pvwait} while waiting for scan status," # f"Timeout after {self.timeout_for_pvwait} while waiting for scan status,"
# f" current state: {ScanControlScanStatus(self.scan_control.scan_msg.get())}" # f" current state: {ScanControlScanStatus(self.scan_control.scan_msg.get())}"
# ) from exc # ) from exc

View File

@@ -1,4 +1,4 @@
""" Module for additional utils of the Mo1 Bragg Positioner""" """Module for additional utils of the Mo1 Bragg Positioner"""
import numpy as np import numpy as np
from scipy.interpolate import BSpline from scipy.interpolate import BSpline

View File

@@ -1,18 +1,20 @@
from __future__ import annotations from __future__ import annotations
from typing import Literal, TYPE_CHECKING, cast
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from typing import TYPE_CHECKING, Literal, cast
from ophyd import Device, Kind, DeviceStatus, Component as Cpt
from ophyd import EpicsSignal, EpicsSignalRO, StatusBase
from ophyd_devices.sim.sim_signals import SetableSignal
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.sim.sim_signals import SetableSignal
from debye_bec.devices.nidaq.nidaq_enums import ( from debye_bec.devices.nidaq.nidaq_enums import (
NIDAQCompression,
ScanType,
NidaqState,
ScanRates,
ReadoutRange,
EncoderTypes, EncoderTypes,
NIDAQCompression,
NidaqState,
ReadoutRange,
ScanRates,
ScanType,
) )
if TYPE_CHECKING: # pragma: no cover if TYPE_CHECKING: # pragma: no cover
@@ -29,69 +31,265 @@ class NidaqControl(Device):
"""Nidaq control class with all PVs""" """Nidaq control class with all PVs"""
### Readback PVs for EpicsEmitter ### ### Readback PVs for EpicsEmitter ###
ai0 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI0", kind=Kind.normal, doc="EPICS analog input 0",auto_monitor=True) ai0 = Cpt(
ai1 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI1", kind=Kind.normal, doc="EPICS analog input 1",auto_monitor=True) EpicsSignalRO,
ai2 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI2", kind=Kind.normal, doc="EPICS analog input 2",auto_monitor=True) suffix="NIDAQ-AI0",
ai3 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI3", kind=Kind.normal, doc="EPICS analog input 3",auto_monitor=True) kind=Kind.normal,
ai4 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI4", kind=Kind.normal, doc="EPICS analog input 4",auto_monitor=True) doc="EPICS analog input 0",
ai5 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI5", kind=Kind.normal, doc="EPICS analog input 5",auto_monitor=True) auto_monitor=True,
ai6 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI6", kind=Kind.normal, doc="EPICS analog input 6",auto_monitor=True) )
ai7 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI7", kind=Kind.normal, doc="EPICS analog input 7",auto_monitor=True) ai1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI1",
kind=Kind.normal,
doc="EPICS analog input 1",
auto_monitor=True,
)
ai2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI2",
kind=Kind.normal,
doc="EPICS analog input 2",
auto_monitor=True,
)
ai3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI3",
kind=Kind.normal,
doc="EPICS analog input 3",
auto_monitor=True,
)
ai4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI4",
kind=Kind.normal,
doc="EPICS analog input 4",
auto_monitor=True,
)
ai5 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI5",
kind=Kind.normal,
doc="EPICS analog input 5",
auto_monitor=True,
)
ai6 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI6",
kind=Kind.normal,
doc="EPICS analog input 6",
auto_monitor=True,
)
ai7 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI7",
kind=Kind.normal,
doc="EPICS analog input 7",
auto_monitor=True,
)
ci0 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI0", kind=Kind.normal, doc="EPICS counter input 0", auto_monitor=True) ci0 = Cpt(
ci1 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI1", kind=Kind.normal, doc="EPICS counter input 1", auto_monitor=True) EpicsSignalRO,
ci2 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI2", kind=Kind.normal, doc="EPICS counter input 2", auto_monitor=True) suffix="NIDAQ-CI0",
ci3 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI3", kind=Kind.normal, doc="EPICS counter input 3", auto_monitor=True) kind=Kind.normal,
ci4 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI4", kind=Kind.normal, doc="EPICS counter input 4", auto_monitor=True) doc="EPICS counter input 0",
ci5 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI5", kind=Kind.normal, doc="EPICS counter input 5", auto_monitor=True) auto_monitor=True,
ci6 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI6", kind=Kind.normal, doc="EPICS counter input 6", auto_monitor=True) )
ci7 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI7", kind=Kind.normal, doc="EPICS counter input 7", auto_monitor=True) ci1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI1",
kind=Kind.normal,
doc="EPICS counter input 1",
auto_monitor=True,
)
ci2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI2",
kind=Kind.normal,
doc="EPICS counter input 2",
auto_monitor=True,
)
ci3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI3",
kind=Kind.normal,
doc="EPICS counter input 3",
auto_monitor=True,
)
ci4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI4",
kind=Kind.normal,
doc="EPICS counter input 4",
auto_monitor=True,
)
ci5 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI5",
kind=Kind.normal,
doc="EPICS counter input 5",
auto_monitor=True,
)
ci6 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI6",
kind=Kind.normal,
doc="EPICS counter input 6",
auto_monitor=True,
)
ci7 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI7",
kind=Kind.normal,
doc="EPICS counter input 7",
auto_monitor=True,
)
di0 = Cpt(EpicsSignalRO, suffix="NIDAQ-DI0", kind=Kind.normal, doc="EPICS digital input 0", auto_monitor=True) di0 = Cpt(
di1 = Cpt(EpicsSignalRO, suffix="NIDAQ-DI1", kind=Kind.normal, doc="EPICS digital input 1", auto_monitor=True) EpicsSignalRO,
di2 = Cpt(EpicsSignalRO, suffix="NIDAQ-DI2", kind=Kind.normal, doc="EPICS digital input 2", auto_monitor=True) suffix="NIDAQ-DI0",
di3 = Cpt(EpicsSignalRO, suffix="NIDAQ-DI3", kind=Kind.normal, doc="EPICS digital input 3", auto_monitor=True) kind=Kind.normal,
di4 = Cpt(EpicsSignalRO, suffix="NIDAQ-DI4", kind=Kind.normal, doc="EPICS digital input 4", auto_monitor=True) doc="EPICS digital input 0",
auto_monitor=True,
)
di1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI1",
kind=Kind.normal,
doc="EPICS digital input 1",
auto_monitor=True,
)
di2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI2",
kind=Kind.normal,
doc="EPICS digital input 2",
auto_monitor=True,
)
di3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI3",
kind=Kind.normal,
doc="EPICS digital input 3",
auto_monitor=True,
)
di4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI4",
kind=Kind.normal,
doc="EPICS digital input 4",
auto_monitor=True,
)
enc_epics = Cpt(
EpicsSignalRO,
suffix="NIDAQ-ENC",
kind=Kind.normal,
doc="EPICS Encoder reading",
auto_monitor=True,
)
enc_epics = Cpt(EpicsSignalRO, suffix="NIDAQ-ENC", kind=Kind.normal, doc="EPICS Encoder reading", auto_monitor=True)
### Readback for BEC emitter ### ### Readback for BEC emitter ###
ai0_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, MEAN") ai0_mean = Cpt(
ai1_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, MEAN") SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, MEAN"
ai2_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, MEAN") )
ai3_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, MEAN") ai1_mean = Cpt(
ai4_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, MEAN") SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, MEAN"
ai5_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, MEAN") )
ai6_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, MEAN") ai2_mean = Cpt(
ai7_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, MEAN") SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, MEAN"
)
ai3_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, MEAN"
)
ai4_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, MEAN"
)
ai5_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, MEAN"
)
ai6_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, MEAN"
)
ai7_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, MEAN"
)
ai0_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, STD") ai0_std_dev = Cpt(
ai1_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, STD") SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, STD"
ai2_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, STD") )
ai3_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, STD") ai1_std_dev = Cpt(
ai4_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, STD") SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, STD"
ai5_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, STD") )
ai6_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, STD") ai2_std_dev = Cpt(
ai7_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, STD") SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, STD"
)
ai3_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, STD"
)
ai4_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, STD"
)
ai5_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, STD"
)
ai6_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, STD"
)
ai7_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, STD"
)
ci0_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0, MEAN") ci0_mean = Cpt(
ci1_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1, MEAN") SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0, MEAN"
ci2_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2, MEAN") )
ci3_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3, MEAN") ci1_mean = Cpt(
ci4_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4, MEAN") SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1, MEAN"
ci5_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5, MEAN") )
ci6_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6, MEAN") ci2_mean = Cpt(
ci7_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7, MEAN") SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2, MEAN"
)
ci3_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3, MEAN"
)
ci4_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4, MEAN"
)
ci5_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5, MEAN"
)
ci6_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6, MEAN"
)
ci7_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7, MEAN"
)
ci0_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0. STD") ci0_std_dev = Cpt(
ci1_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1. STD") SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0. STD"
ci2_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2. STD") )
ci3_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3. STD") ci1_std_dev = Cpt(
ci4_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4. STD") SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1. STD"
ci5_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5. STD") )
ci6_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6. STD") ci2_std_dev = Cpt(
ci7_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7. STD") SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2. STD"
)
ci3_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3. STD"
)
ci4_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4. STD"
)
ci5_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5. STD"
)
ci6_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6. STD"
)
ci7_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7. STD"
)
di0_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 0, MAX") di0_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 0, MAX")
di1_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 1, MAX") di1_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 1, MAX")
@@ -135,7 +333,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs): def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.timeout_wait_for_signal = 5 # put 5s firsts self.timeout_wait_for_signal = 5 # put 5s firsts
self._timeout_wait_for_pv = 3 # 3s timeout for pv calls self._timeout_wait_for_pv = 3 # 3s timeout for pv calls
self.valid_scan_names = [ self.valid_scan_names = [
"xas_simple_scan", "xas_simple_scan",
"xas_simple_scan_with_xrd", "xas_simple_scan_with_xrd",
@@ -275,10 +473,13 @@ class Nidaq(PSIDeviceBase, NidaqControl):
Default values for signals should be set here. Default values for signals should be set here.
""" """
if not self.wait_for_condition( if not self.wait_for_condition(
condition= lambda: self.state.get() == NidaqState.STANDBY, condition=lambda: self.state.get() == NidaqState.STANDBY,
timeout = self.timeout_wait_for_signal, timeout=self.timeout_wait_for_signal,
check_stopped=True): check_stopped=True,
raise NidaqError(f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}") ):
raise NidaqError(
f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}"
)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv) self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
def on_stage(self) -> DeviceStatus | StatusBase | None: def on_stage(self) -> DeviceStatus | StatusBase | None:
@@ -292,22 +493,26 @@ class Nidaq(PSIDeviceBase, NidaqControl):
return None return None
if not self.wait_for_condition( if not self.wait_for_condition(
condition=lambda: self.state.get() == NidaqState.STANDBY, timeout=self.timeout_wait_for_signal, check_stopped=True condition=lambda: self.state.get() == NidaqState.STANDBY,
timeout=self.timeout_wait_for_signal,
check_stopped=True,
): ):
raise NidaqError( raise NidaqError(
f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}" f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}"
) )
self.scan_type.set(ScanType.TRIGGERED).wait(timeout = self._timeout_wait_for_pv) self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv) self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
self.stage_call.set(1).wait(timeout = self._timeout_wait_for_pv) self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv)
if not self.wait_for_condition( if not self.wait_for_condition(
condition=lambda: self.state.get() == NidaqState.STAGE, timeout=self.timeout_wait_for_signal, check_stopped=True condition=lambda: self.state.get() == NidaqState.STAGE,
timeout=self.timeout_wait_for_signal,
check_stopped=True,
): ):
raise NidaqError( raise NidaqError(
f"Device {self.name} has not been reached in state STAGE, current state {NidaqState(self.state.get())}" f"Device {self.name} has not been reached in state STAGE, current state {NidaqState(self.state.get())}"
) )
self.kickoff_call.set(1).wait(timeout = self._timeout_wait_for_pv) self.kickoff_call.set(1).wait(timeout=self._timeout_wait_for_pv)
logger.info(f"Device {self.name} was staged: {NidaqState(self.state.get())}") logger.info(f"Device {self.name} was staged: {NidaqState(self.state.get())}")
def on_unstage(self) -> DeviceStatus | StatusBase | None: def on_unstage(self) -> DeviceStatus | StatusBase | None:
@@ -361,10 +566,10 @@ class Nidaq(PSIDeviceBase, NidaqControl):
if not self._check_if_scan_name_is_valid(): if not self._check_if_scan_name_is_valid():
return None return None
self.on_stop() self.on_stop()
#TODO check if this wait can be removed. We are waiting in unstage anyways which will always be called afterwards # TODO check if this wait can be removed. We are waiting in unstage anyways which will always be called afterwards
# Wait for device to be stopped # Wait for device to be stopped
status = self.wait_for_condition( status = self.wait_for_condition(
condition = lambda: self.state.get() == NidaqState.STANDBY, condition=lambda: self.state.get() == NidaqState.STANDBY,
check_stopped=True, check_stopped=True,
timeout=self.timeout_wait_for_signal, timeout=self.timeout_wait_for_signal,
) )

View File

@@ -2,17 +2,22 @@ import enum
class NIDAQCompression(str, enum.Enum): class NIDAQCompression(str, enum.Enum):
""" Options for Compression""" """Options for Compression"""
OFF = 0 OFF = 0
ON = 1 ON = 1
class ScanType(int, enum.Enum): class ScanType(int, enum.Enum):
""" Triggering options of the backend""" """Triggering options of the backend"""
TRIGGERED = 0 TRIGGERED = 0
CONTINUOUS = 1 CONTINUOUS = 1
class NidaqState(int, enum.Enum): class NidaqState(int, enum.Enum):
""" Possible States of the NIDAQ backend""" """Possible States of the NIDAQ backend"""
DISABLED = 0 DISABLED = 0
STANDBY = 1 STANDBY = 1
STAGE = 2 STAGE = 2
@@ -20,8 +25,10 @@ class NidaqState(int, enum.Enum):
ACQUIRE = 4 ACQUIRE = 4
UNSTAGE = 5 UNSTAGE = 5
class ScanRates(int, enum.Enum): class ScanRates(int, enum.Enum):
""" Sampling Rate options for the backend, in kHZ and MHz""" """Sampling Rate options for the backend, in kHZ and MHz"""
HUNDRED_KHZ = 0 HUNDRED_KHZ = 0
FIVE_HUNDRED_KHZ = 1 FIVE_HUNDRED_KHZ = 1
ONE_MHZ = 2 ONE_MHZ = 2
@@ -31,15 +38,19 @@ class ScanRates(int, enum.Enum):
TEN_MHZ = 6 TEN_MHZ = 6
FOURTEEN_THREE_MHZ = 7 FOURTEEN_THREE_MHZ = 7
class ReadoutRange(int, enum.Enum): class ReadoutRange(int, enum.Enum):
"""ReadoutRange in +-V""" """ReadoutRange in +-V"""
ONE_V = 0 ONE_V = 0
TWO_V = 1 TWO_V = 1
FIVE_V = 2 FIVE_V = 2
TEN_V = 3 TEN_V = 3
class EncoderTypes(int, enum.Enum): class EncoderTypes(int, enum.Enum):
""" Encoder Types""" """Encoder Types"""
X_1 = 0 X_1 = 0
X_2 = 1 X_2 = 1
X_4 = 2 X_4 = 2

View File

@@ -1,17 +1,20 @@
""" ES2 Reference Foil Changer""" """ES2 Reference Foil Changer"""
from __future__ import annotations from __future__ import annotations
import enum import enum
from typing import TYPE_CHECKING
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd.status import DeviceStatus from ophyd.status import DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError from ophyd_devices.utils.errors import DeviceStopError
from typing import TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo from bec_lib.devicemanager import ScanInfo
class Status(int, enum.Enum): class Status(int, enum.Enum):
"""Enum class for the status field""" """Enum class for the status field"""
@@ -21,6 +24,7 @@ class Status(int, enum.Enum):
MOVING = 3 MOVING = 3
ERROR = 4 ERROR = 4
class OpMode(int, enum.Enum): class OpMode(int, enum.Enum):
"""Enum class for the Operating Mode field""" """Enum class for the Operating Mode field"""
@@ -29,95 +33,122 @@ class OpMode(int, enum.Enum):
DIAGNOSTICMODE = 2 DIAGNOSTICMODE = 2
ERRORMODE = 3 ERRORMODE = 3
class Reffoilchanger(PSIDeviceBase): class Reffoilchanger(PSIDeviceBase):
"""Class for the ES2 Reference Foil Changer""" """Class for the ES2 Reference Foil Changer"""
USER_ACCESS = ['insert'] USER_ACCESS = ["insert"]
inserted = Cpt( inserted = Cpt(
EpicsSignalRO, suffix="ES2-REF:TRY-FilterInserted", kind="config", doc='Inserted indicator' EpicsSignalRO, suffix="ES2-REF:TRY-FilterInserted", kind="config", doc="Inserted indicator"
) )
retracted = Cpt( retracted = Cpt(
EpicsSignalRO, suffix="ES2-REF:TRY-FilterRetracted", kind="config", doc='Retracted indicator' EpicsSignalRO,
) suffix="ES2-REF:TRY-FilterRetracted",
moving = Cpt( kind="config",
EpicsSignalRO, suffix="ES2-REF:ROTY.MOVN", kind="config", doc='Moving indicator' doc="Retracted indicator",
) )
moving = Cpt(EpicsSignalRO, suffix="ES2-REF:ROTY.MOVN", kind="config", doc="Moving indicator")
status = Cpt( status = Cpt(
EpicsSignal, suffix="ES2-REF:SELN-FilterState-ENUM_RBV", kind="config", doc='Status' EpicsSignal, suffix="ES2-REF:SELN-FilterState-ENUM_RBV", kind="config", doc="Status"
) )
op_mode = Cpt( op_mode = Cpt(
EpicsSignalWithRBV, suffix="ES2-REF:SELN-OpMode-ENUM", kind="config", doc='Status' EpicsSignalWithRBV, suffix="ES2-REF:SELN-OpMode-ENUM", kind="config", doc="Status"
)
ref_set = Cpt(
EpicsSignal, suffix="ES2-REF:SELN-SET", kind="config", doc='Requested reference'
) )
ref_set = Cpt(EpicsSignal, suffix="ES2-REF:SELN-SET", kind="config", doc="Requested reference")
ref_rb = Cpt( ref_rb = Cpt(
EpicsSignalRO, suffix="ES2-REF:SELN-RB", kind="config", doc='Currently set reference' EpicsSignalRO, suffix="ES2-REF:SELN-RB", kind="config", doc="Currently set reference"
) )
foil01 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL01.DESC", kind="config", doc='Foil 01') foil01 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL01.DESC", kind="config", doc="Foil 01")
foil02 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL02.DESC", kind="config", doc='Foil 02') foil02 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL02.DESC", kind="config", doc="Foil 02")
foil03 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL03.DESC", kind="config", doc='Foil 03') foil03 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL03.DESC", kind="config", doc="Foil 03")
foil04 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL04.DESC", kind="config", doc='Foil 04') foil04 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL04.DESC", kind="config", doc="Foil 04")
foil05 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL05.DESC", kind="config", doc='Foil 05') foil05 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL05.DESC", kind="config", doc="Foil 05")
foil06 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL06.DESC", kind="config", doc='Foil 06') foil06 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL06.DESC", kind="config", doc="Foil 06")
foil07 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL07.DESC", kind="config", doc='Foil 07') foil07 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL07.DESC", kind="config", doc="Foil 07")
foil08 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL08.DESC", kind="config", doc='Foil 08') foil08 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL08.DESC", kind="config", doc="Foil 08")
foil09 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL09.DESC", kind="config", doc='Foil 09') foil09 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL09.DESC", kind="config", doc="Foil 09")
foil10 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL10.DESC", kind="config", doc='Foil 10') foil10 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL10.DESC", kind="config", doc="Foil 10")
foil11 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL11.DESC", kind="config", doc='Foil 11') foil11 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL11.DESC", kind="config", doc="Foil 11")
foil12 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL12.DESC", kind="config", doc='Foil 12') foil12 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL12.DESC", kind="config", doc="Foil 12")
foil13 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL13.DESC", kind="config", doc='Foil 13') foil13 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL13.DESC", kind="config", doc="Foil 13")
foil14 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL14.DESC", kind="config", doc='Foil 14') foil14 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL14.DESC", kind="config", doc="Foil 14")
foil15 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL15.DESC", kind="config", doc='Foil 15') foil15 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL15.DESC", kind="config", doc="Foil 15")
foil16 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL16.DESC", kind="config", doc='Foil 16') foil16 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL16.DESC", kind="config", doc="Foil 16")
foil17 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL17.DESC", kind="config", doc='Foil 17') foil17 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL17.DESC", kind="config", doc="Foil 17")
foil18 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL18.DESC", kind="config", doc='Foil 18') foil18 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL18.DESC", kind="config", doc="Foil 18")
foil19 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL19.DESC", kind="config", doc='Foil 19') foil19 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL19.DESC", kind="config", doc="Foil 19")
foil20 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL20.DESC", kind="config", doc='Foil 20') foil20 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL20.DESC", kind="config", doc="Foil 20")
foil21 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL21.DESC", kind="config", doc='Foil 21') foil21 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL21.DESC", kind="config", doc="Foil 21")
foil22 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL22.DESC", kind="config", doc='Foil 22') foil22 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL22.DESC", kind="config", doc="Foil 22")
foil23 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL23.DESC", kind="config", doc='Foil 23') foil23 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL23.DESC", kind="config", doc="Foil 23")
foil24 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL24.DESC", kind="config", doc='Foil 24') foil24 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL24.DESC", kind="config", doc="Foil 24")
foil25 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL25.DESC", kind="config", doc='Foil 25') foil25 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL25.DESC", kind="config", doc="Foil 25")
foil26 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL26.DESC", kind="config", doc='Foil 26') foil26 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL26.DESC", kind="config", doc="Foil 26")
foil27 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL27.DESC", kind="config", doc='Foil 27') foil27 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL27.DESC", kind="config", doc="Foil 27")
foil28 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL28.DESC", kind="config", doc='Foil 28') foil28 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL28.DESC", kind="config", doc="Foil 28")
foil29 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL29.DESC", kind="config", doc='Foil 29') foil29 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL29.DESC", kind="config", doc="Foil 29")
foil30 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL30.DESC", kind="config", doc='Foil 30') foil30 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL30.DESC", kind="config", doc="Foil 30")
foil31 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL31.DESC", kind="config", doc='Foil 31') foil31 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL31.DESC", kind="config", doc="Foil 31")
foil32 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL32.DESC", kind="config", doc='Foil 32') foil32 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL32.DESC", kind="config", doc="Foil 32")
foil33 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL33.DESC", kind="config", doc='Foil 33') foil33 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL33.DESC", kind="config", doc="Foil 33")
foil34 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL34.DESC", kind="config", doc='Foil 34') foil34 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL34.DESC", kind="config", doc="Foil 34")
foil35 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL35.DESC", kind="config", doc='Foil 35') foil35 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL35.DESC", kind="config", doc="Foil 35")
foil36 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL36.DESC", kind="config", doc='Foil 36') foil36 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL36.DESC", kind="config", doc="Foil 36")
foil37 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL37.DESC", kind="config", doc='Foil 37') foil37 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL37.DESC", kind="config", doc="Foil 37")
foil38 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL38.DESC", kind="config", doc='Foil 38') foil38 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL38.DESC", kind="config", doc="Foil 38")
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.foils = [ self.foils = [
self.foil01, self.foil02, self.foil03, self.foil04, self.foil05, self.foil06, self.foil01,
self.foil07, self.foil08, self.foil09, self.foil10, self.foil11, self.foil12, self.foil02,
self.foil13, self.foil14, self.foil15, self.foil16, self.foil17, self.foil18, self.foil03,
self.foil19, self.foil20, self.foil21, self.foil22, self.foil23, self.foil24, self.foil04,
self.foil25, self.foil26, self.foil27, self.foil28, self.foil29, self.foil30, self.foil05,
self.foil31, self.foil32, self.foil33, self.foil34, self.foil35, self.foil36, self.foil06,
self.foil37, self.foil38 self.foil07,
self.foil08,
self.foil09,
self.foil10,
self.foil11,
self.foil12,
self.foil13,
self.foil14,
self.foil15,
self.foil16,
self.foil17,
self.foil18,
self.foil19,
self.foil20,
self.foil21,
self.foil22,
self.foil23,
self.foil24,
self.foil25,
self.foil26,
self.foil27,
self.foil28,
self.foil29,
self.foil30,
self.foil31,
self.foil32,
self.foil33,
self.foil34,
self.foil35,
self.foil36,
self.foil37,
self.foil38,
] ]
def insert( def insert(self, ref: str, wait: bool = False) -> DeviceStatus:
self,
ref:str,
wait:bool = False
) -> DeviceStatus:
"""Insert a reference """Insert a reference
Args: Args:
ref (str) : Desired reference foil name, e.g. Fe or Pt ref (str) : Desired reference foil name, e.g. Fe or Pt
wait (bool): If you like to wait for the filling to finish. Default False. wait (bool): If you like to wait for the filling to finish. Default False.
""" """
filter_number = -1 filter_number = -1
@@ -126,36 +157,38 @@ class Reffoilchanger(PSIDeviceBase):
filter_number = i + 1 filter_number = i + 1
break break
if filter_number == -1: if filter_number == -1:
raise ValueError(f'Requested foil ({ref}) is not in list of available foils') raise ValueError(f"Requested foil ({ref}) is not in list of available foils")
if self.op_mode.get() == OpMode.USERMODE: if self.op_mode.get() == OpMode.USERMODE:
self.ref_set.put(filter_number) self.ref_set.put(filter_number)
def wait_for_status(): def wait_for_status():
return ( return (
(self.status.get() == Status.RETRACTED) or (self.status.get() == Status.RETRACTED)
(self.status.get() == Status.MOVING) or or (self.status.get() == Status.MOVING)
( or (
self.ref_rb.get() < (filter_number + 0.2) and self.ref_rb.get() < (filter_number + 0.2)
self.ref_rb.get() > (filter_number - 0.2) and self.ref_rb.get() > (filter_number - 0.2)
) )
) )
timeout = 3 timeout = 3
if not self.wait_for_condition(wait_for_status, timeout=timeout, check_stopped=True): if not self.wait_for_condition(wait_for_status, timeout=timeout, check_stopped=True):
raise TimeoutError( raise TimeoutError(
f"Reference foil changer did not retract the current foil within {timeout}s" f"Reference foil changer did not retract the current foil within {timeout}s"
) )
def wait_for_change_finished(): def wait_for_change_finished():
return self.status.get() == Status.INSERTED and self.op_mode == OpMode.USERMODE return self.status.get() == Status.INSERTED and self.op_mode == OpMode.USERMODE
# Wait until new reference foil is inserted # Wait until new reference foil is inserted
status = self.task_handler.submit_task( status = self.task_handler.submit_task(
task=self.wait_for_condition, task=self.wait_for_condition, task_args=(wait_for_change_finished, 5, True)
task_args=(wait_for_change_finished, 5, True)) )
if wait: if wait:
status.wait() status.wait()
return status return status
else: else:
raise DeviceStopError( raise DeviceStopError(
f'Reference foil changer must be in User Mode but is in {self.op_mode.get(as_string=True)}' f"Reference foil changer must be in User Mode but is in {self.op_mode.get(as_string=True)}"
) )

View File

@@ -1 +1,6 @@
from .mono_bragg_scans import XASSimpleScan, XASSimpleScanWithXRD, XASAdvancedScan, XASAdvancedScanWithXRD from .mono_bragg_scans import (
XASAdvancedScan,
XASAdvancedScanWithXRD,
XASSimpleScan,
XASSimpleScanWithXRD,
)