fix: formatting

This commit was merged in pull request #50.
This commit is contained in:
2025-03-18 19:26:31 +01:00
parent 01a17cbe3a
commit d3ab7316ac
9 changed files with 284 additions and 248 deletions

View File

@@ -1,32 +1,29 @@
from ophyd_devices.devices.areadetector.cam import AravisDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd import ADComponent as ADCpt
from ophyd import ADBase, Kind
# from ophyd_devices.sim.sim_signals import SetableSignal
# from ophyd_devices.utils.psi_component import PSIComponent, SignalType
import numpy as np
from ophyd import ADBase
from ophyd import ADComponent as ADCpt
from ophyd import Kind
from ophyd_devices.devices.areadetector.cam import AravisDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
class BaslerCamBase(ADBase):
cam1 = ADCpt(AravisDetectorCam, "cam1:")
image1 = ADCpt(ImagePlugin_V35, 'image1:')
image1 = ADCpt(ImagePlugin_V35, "image1:")
class BaslerCam(PSIDeviceBase, BaslerCamBase):
# preview_2d = PSIComponent(SetableSignal, signal_type=SignalType.PREVIEW, ndim=2, kind=Kind.omitted)
def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs):
width = self.image1.array_size.width.get()
height = self.image1.array_size.height.get()
data = np.reshape(value, (height,width))
data = np.reshape(value, (height, width))
# self.preview_2d.put(data)
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data)
def on_connected(self):
self.image1.array_data.subscribe(self.emit_to_bec, run=False)
self.image1.array_data.subscribe(self.emit_to_bec, run=False)

View File

@@ -1,26 +1,25 @@
import numpy as np
from ophyd import ADBase
from ophyd import ADComponent as ADCpt
from ophyd import Component as Cpt
from ophyd import Device
from ophyd_devices.devices.areadetector.cam import ProsilicaDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd import Component as Cpt
from ophyd import ADComponent as ADCpt
from ophyd import Device, ADBase
import numpy as np
class ProsilicaCamBase(ADBase):
cam1 = ADCpt(ProsilicaDetectorCam, "cam1:")
image1 = ADCpt(ImagePlugin_V35, 'image1:')
image1 = ADCpt(ImagePlugin_V35, "image1:")
class ProsilicaCam(PSIDeviceBase, ProsilicaCamBase):
def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs):
width = self.image1.array_size.width.get()
height = self.image1.array_size.height.get()
data = np.reshape(value, (height,width))
data = np.reshape(value, (height, width))
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data)
def on_connected(self):
self.image1.array_data.subscribe(self.emit_to_bec, run=False)
self.image1.array_data.subscribe(self.emit_to_bec, run=False)

View File

@@ -1,34 +1,35 @@
""" ES0 Filter Station"""
"""ES0 Filter Station"""
from typing import Literal
from ophyd import Component as Cpt
from ophyd import Device, Kind, EpicsSignal
from typing import Literal
from ophyd import Device, EpicsSignal, Kind
from ophyd_devices.utils import bec_utils
from typeguard import typechecked
from ophyd_devices.utils import bec_utils
class EpicsSignalWithRBVBit(EpicsSignal):
def __init__(self, prefix, *, bit:int, **kwargs):
def __init__(self, prefix, *, bit: int, **kwargs):
super().__init__(prefix, **kwargs)
self.bit = bit
self.bit = bit
@typechecked
def put(self, value:Literal[0,1], **kwargs):
def put(self, value: Literal[0, 1], **kwargs):
bit_value = super().get()
#convert to int
# convert to int
bit_value = int(bit_value)
if value ==1:
if value == 1:
new_value = bit_value | (1 << self.bit)
else:
new_value = bit_value & ~(1 << self.bit)
super().put(new_value, **kwargs)
def get(self, **kwargs) -> Literal[0,1]:
def get(self, **kwargs) -> Literal[0, 1]:
bit_value = super().get()
#convert to int
# convert to int
bit_value = int(bit_value)
if (bit_value & (1 << self.bit)) ==0:
if (bit_value & (1 << self.bit)) == 0:
return 0
return 1
@@ -36,18 +37,18 @@ class EpicsSignalWithRBVBit(EpicsSignal):
class ES0Filter(Device):
"""Class for the ES0 filter station X01DA-ES0-FI:"""
Mo400 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=1,kind="config",doc='Mo400 filter')
Mo300 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=2,kind="config",doc='Mo300 filter')
Mo200 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=3,kind="config",doc='Mo200 filter')
Zn500 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=4,kind="config",doc='Zn500 filter')
Zn250 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=5,kind="config",doc='Zn250 filter')
Zn125 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=6,kind="config",doc='Zn125 filter')
Zn50 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=7,kind="config",doc='Zn50 filter')
Zn25 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=8,kind="config",doc='Zn25 filter')
Al500 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=9,kind="config",doc='Al500 filter')
Al320 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=10,kind="config",doc='Al320 filter')
Al200 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=11,kind="config",doc='Al200 filter')
Al100 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=12,kind="config",doc='Al100 filter')
Al50 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=13,kind="config",doc='Al50 filter')
Al20 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=14,kind="config",doc='Al20 filter')
Al10 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=15,kind="config",doc='Al10 filter')
Mo400 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=1, kind="config", doc="Mo400 filter")
Mo300 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=2, kind="config", doc="Mo300 filter")
Mo200 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=3, kind="config", doc="Mo200 filter")
Zn500 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=4, kind="config", doc="Zn500 filter")
Zn250 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=5, kind="config", doc="Zn250 filter")
Zn125 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=6, kind="config", doc="Zn125 filter")
Zn50 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=7, kind="config", doc="Zn50 filter")
Zn25 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=8, kind="config", doc="Zn25 filter")
Al500 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=9, kind="config", doc="Al500 filter")
Al320 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=10, kind="config", doc="Al320 filter")
Al200 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=11, kind="config", doc="Al200 filter")
Al100 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=12, kind="config", doc="Al100 filter")
Al50 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=13, kind="config", doc="Al50 filter")
Al20 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=14, kind="config", doc="Al20 filter")
Al10 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=15, kind="config", doc="Al10 filter")

View File

@@ -1,106 +1,101 @@
from ophyd import Device,Kind,Component as Cpt, DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignalWithRBV, EpicsSignal, EpicsSignalRO
from ophyd.status import SubscriptionStatus, DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typing import Literal
from typeguard import typechecked
import numpy as np
from debye_bec.devices.ionization_chambers.ionization_chamber_enums import AmplifierEnable, AmplifierGain, AmplifierFilter
import numpy as np
from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Kind
from ophyd.status import DeviceStatus, SubscriptionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked
from debye_bec.devices.ionization_chambers.ionization_chamber_enums import (
AmplifierEnable,
AmplifierFilter,
AmplifierGain,
)
class EpicsSignalSplit(EpicsSignal):
""" Wrapper around EpicsSignal with different read and write pv"""
"""Wrapper around EpicsSignal with different read and write pv"""
def __init__(self, prefix, **kwargs):
super().__init__(prefix + "-RB", write_pv=prefix+"Set", **kwargs)
super().__init__(prefix + "-RB", write_pv=prefix + "Set", **kwargs)
class GasMixSetupControl(Device):
"""GasMixSetup Control for Inonization Chamber 0"""
gas1_req = Cpt(
EpicsSignalWithRBV, suffix="Gas1Req", kind="config", doc='Gas 1 requirement'
)
gas1_req = Cpt(EpicsSignalWithRBV, suffix="Gas1Req", kind="config", doc="Gas 1 requirement")
conc1_req = Cpt(
EpicsSignalWithRBV, suffix="Conc1Req", kind="config", doc='Concentration 1 requirement'
)
gas2_req = Cpt(
EpicsSignalWithRBV, suffix="Gas2Req", kind="config", doc='Gas 2 requirement'
EpicsSignalWithRBV, suffix="Conc1Req", kind="config", doc="Concentration 1 requirement"
)
gas2_req = Cpt(EpicsSignalWithRBV, suffix="Gas2Req", kind="config", doc="Gas 2 requirement")
conc2_req = Cpt(
EpicsSignalWithRBV, suffix="Conc2Req", kind="config", doc='Concentration 2 requirement'
EpicsSignalWithRBV, suffix="Conc2Req", kind="config", doc="Concentration 2 requirement"
)
press_req = Cpt(
EpicsSignalWithRBV, suffix="PressReq", kind="config", doc='Pressure requirement'
)
fill = Cpt(
EpicsSignal, suffix="Fill", kind="config", doc='Fill the chamber'
)
status = Cpt(
EpicsSignalRO, suffix="Status", kind="config", doc='Status'
)
gas1 = Cpt(
EpicsSignalRO, suffix="Gas1", kind="config", doc='Gas 1'
)
conc1 = Cpt(
EpicsSignalRO, suffix="Conc1", kind="config", doc='Concentration 1'
)
gas2 = Cpt(
EpicsSignalRO, suffix="Gas2", kind="config", doc='Gas 2'
)
conc2 = Cpt(
EpicsSignalRO, suffix="Conc2", kind="config", doc='Concentration 2'
)
press = Cpt(
EpicsSignalRO, suffix="PressTransm", kind="config", doc='Current Pressure'
)
status_msg = Cpt(
EpicsSignalRO, suffix="StatusMsg0", kind="config", doc='Status'
EpicsSignalWithRBV, suffix="PressReq", kind="config", doc="Pressure requirement"
)
fill = Cpt(EpicsSignal, suffix="Fill", kind="config", doc="Fill the chamber")
status = Cpt(EpicsSignalRO, suffix="Status", kind="config", doc="Status")
gas1 = Cpt(EpicsSignalRO, suffix="Gas1", kind="config", doc="Gas 1")
conc1 = Cpt(EpicsSignalRO, suffix="Conc1", kind="config", doc="Concentration 1")
gas2 = Cpt(EpicsSignalRO, suffix="Gas2", kind="config", doc="Gas 2")
conc2 = Cpt(EpicsSignalRO, suffix="Conc2", kind="config", doc="Concentration 2")
press = Cpt(EpicsSignalRO, suffix="PressTransm", kind="config", doc="Current Pressure")
status_msg = Cpt(EpicsSignalRO, suffix="StatusMsg0", kind="config", doc="Status")
class HighVoltageSuppliesControl(Device):
""" HighVoltage Supplies Control for Ionization Chamber 0"""
"""HighVoltage Supplies Control for Ionization Chamber 0"""
hv_v = Cpt(EpicsSignalSplit, suffix="HV1-V", kind="config", doc="HV voltage")
hv_i = Cpt(EpicsSignalSplit, suffix="HV1-I", kind="config", doc="HV current")
grid_v = Cpt(EpicsSignalSplit, suffix="HV2-V", kind="config", doc="Grid voltage")
grid_i = Cpt(EpicsSignalSplit, suffix="HV2-I", kind="config", doc="Grid current")
hv_v = Cpt(
EpicsSignalSplit, suffix="HV1-V", kind="config", doc='HV voltage'
)
hv_i = Cpt(
EpicsSignalSplit, suffix="HV1-I", kind="config", doc='HV current'
)
grid_v = Cpt(
EpicsSignalSplit, suffix="HV2-V", kind="config", doc='Grid voltage'
)
grid_i = Cpt(
EpicsSignalSplit, suffix="HV2-I", kind="config", doc='Grid current'
)
class IonizationChamber0(PSIDeviceBase):
"""Ionization Chamber 0, prefix should be 'X01DA-'."""
num=1
num = 1
amp_signals = {
"cOnOff": (EpicsSignal, (f"ES:AMP5004.cOnOff{num}"), {"kind":"config", "doc":f'Enable ch{num} -> IC{num-1}'}),
"cGain_ENUM": (EpicsSignalWithRBV, (f"ES:AMP5004:cGain{num}_ENUM"), {"kind":"config", "doc":f'Gain of ch{num} -> IC{num-1}'}),
"cFilter_ENUM": (EpicsSignalWithRBV, (f"ES:AMP5004:cFilter{num}_ENUM"), {"kind":"config", "doc":f'Filter of ch{num} -> IC{num-1}'})
"cOnOff": (
EpicsSignal,
(f"ES:AMP5004.cOnOff{num}"),
{"kind": "config", "doc": f"Enable ch{num} -> IC{num-1}"},
),
"cGain_ENUM": (
EpicsSignalWithRBV,
(f"ES:AMP5004:cGain{num}_ENUM"),
{"kind": "config", "doc": f"Gain of ch{num} -> IC{num-1}"},
),
"cFilter_ENUM": (
EpicsSignalWithRBV,
(f"ES:AMP5004:cFilter{num}_ENUM"),
{"kind": "config", "doc": f"Filter of ch{num} -> IC{num-1}"},
),
}
amp = Dcpt(amp_signals)
gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}")
hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES1-IC{num-1}:")
hv_en_signals = {
"ext_ena" : (EpicsSignalRO, "ES1-IC0:HV-Ext-Ena", {"kind" : "config", "doc" :'External enable signal of HV'}),
"ena" : (EpicsSignalRO, "ES1-IC0:HV-Ena", {"kind" : "config", "doc" :'Enable signal of HV'}),
"ext_ena": (
EpicsSignalRO,
"ES1-IC0:HV-Ext-Ena",
{"kind": "config", "doc": "External enable signal of HV"},
),
"ena": (EpicsSignalRO, "ES1-IC0:HV-Ena", {"kind": "config", "doc": "Enable signal of HV"}),
}
hv_en = Dcpt(hv_en_signals)
def __init__(self, name:str, scan_info = None, **kwargs):
def __init__(self, name: str, scan_info=None, **kwargs):
self.timeout_for_pvwait = 2.5
super().__init__(name=name, scan_info=scan_info, **kwargs)
@typechecked
def set_gain(
self,
gain: Literal['1e6', '1e7', '5e7', '1e8', '1e9'] | AmplifierGain,
) -> None:
def set_gain(self, gain: Literal["1e6", "1e7", "5e7", "1e8", "1e9"] | AmplifierGain) -> None:
"""Configure the gain setting of the specified channel
Args:
@@ -109,29 +104,35 @@ class IonizationChamber0(PSIDeviceBase):
if self.amp.cOnOff.get() == AmplifierEnable.OFF:
self.amp.cOnOff.put(AmplifierEnable.ON)
# Wait until channel is switched on
def _wait_enabled():
return self.amp.cOnOff.get() == AmplifierEnable.ON
if not self.wait_for_condition(_wait_enabled, check_stopped=True, timeout = self.timeout_for_pvwait):
if not self.wait_for_condition(
_wait_enabled, check_stopped=True, timeout=self.timeout_for_pvwait
):
raise TimeoutError(
f"Enabling channel run into timeout after {self.timeout_for_pvwait} seconds"
)
match gain:
case '1e6':
case "1e6":
self.amp.cGain_ENUM.put(AmplifierGain.G1E6)
case '1e7':
case "1e7":
self.amp.cGain_ENUM.put(AmplifierGain.G1E7)
case '5e7':
case "5e7":
self.amp.cGain_ENUM.put(AmplifierGain.G5E7)
case '1e8':
case "1e8":
self.amp.cGain_ENUM.put(AmplifierGain.G1E8)
case '1e9':
case "1e9":
self.amp.cGain_ENUM.put(AmplifierGain.G1E9)
def set_filter(
self,
value: Literal['1us', '3us', '10us', '30us', '100us', '300us', '1ms', '3ms'] | AmplifierFilter,
self,
value: (
Literal["1us", "3us", "10us", "30us", "100us", "300us", "1ms", "3ms"] | AmplifierFilter
),
) -> None:
"""Configure the filter setting of the specified channel
@@ -140,37 +141,38 @@ class IonizationChamber0(PSIDeviceBase):
"""
if self.amp.cOnOff.get() == AmplifierEnable.OFF:
self.amp.cOnOff.put(AmplifierEnable.ON)
# Wait until channel is switched on
def _wait_enabled():
return self.amp.cOnOff.get() == AmplifierEnable.ON
if not self.wait_for_condition(_wait_enabled, check_stopped=True, timeout = self.timeout_for_pvwait):
if not self.wait_for_condition(
_wait_enabled, check_stopped=True, timeout=self.timeout_for_pvwait
):
raise TimeoutError(
f"Enabling channel run into timeout after {self.timeout_for_pvwait} seconds"
)
match value:
case '1us':
case "1us":
self.amp.cFilter_ENUM.put(AmplifierFilter.F1US)
case '3us':
case "3us":
self.amp.cFilter_ENUM.put(AmplifierFilter.F3US)
case '10us':
case "10us":
self.amp.cFilter_ENUM.put(AmplifierFilter.F10US)
case '30us':
case "30us":
self.amp.cFilter_ENUM.put(AmplifierFilter.F30US)
case '100us':
case "100us":
self.amp.cFilter_ENUM.put(AmplifierFilter.F100US)
case '300us':
case "300us":
self.amp.cFilter_ENUM.put(AmplifierFilter.F300US)
case '1ms':
case "1ms":
self.amp.cFilter_ENUM.put(AmplifierFilter.F1MS)
case '3ms':
case "3ms":
self.amp.cFilter_ENUM.put(AmplifierFilter.F3MS)
@typechecked
def set_hv(
self,
hv: float,
) -> None:
def set_hv(self, hv: float) -> None:
"""Configure the high voltage settings , this will
enable the high voltage (if external enable is active)!
@@ -179,13 +181,15 @@ class IonizationChamber0(PSIDeviceBase):
"""
if not 0 < hv < 3000:
raise ValueError(f'specified HV {hv} not within range [0 .. 3000]')
raise ValueError(f"specified HV {hv} not within range [0 .. 3000]")
if self.hv.grid_v.get() > hv:
raise ValueError(f'Grid {self.hv.grid_v.get()} must not be higher than HV {hv}!')
if not self.hv_en.ena.get() == 1 :
raise ValueError(f"Grid {self.hv.grid_v.get()} must not be higher than HV {hv}!")
if not self.hv_en.ena.get() == 1:
def check_ch_ena(*, old_value, value, **kwargs):
return value == 1
status = SubscriptionStatus(device=self.hv_en.ena, callback=check_ch_ena)
self.hv_en.ena.put(1)
# Wait after setting ena to 1
@@ -196,10 +200,7 @@ class IonizationChamber0(PSIDeviceBase):
self.hv.hv_v.put(hv)
@typechecked
def set_grid(
self,
grid: float,
) -> None:
def set_grid(self, grid: float) -> None:
"""Configure the high voltage settings , this will
enable the high voltage (if external enable is active)!
@@ -208,13 +209,15 @@ class IonizationChamber0(PSIDeviceBase):
"""
if not 0 < grid < 3000:
raise ValueError(f'specified Grid {grid} not within range [0 .. 3000]')
raise ValueError(f"specified Grid {grid} not within range [0 .. 3000]")
if grid > self.hv.hv_v.get():
raise ValueError(f'Grid {grid} must not be higher than HV {self.hv.hv_v.get()}!')
if not self.hv_en.ena.get() == 1 :
raise ValueError(f"Grid {grid} must not be higher than HV {self.hv.hv_v.get()}!")
if not self.hv_en.ena.get() == 1:
def check_ch_ena(*, old_value, value, **kwargs):
return value == 1
status = SubscriptionStatus(device=self.hv_en.ena, callback=check_ch_ena)
self.hv_en.ena.put(1)
# Wait after setting ena to 1
@@ -227,13 +230,13 @@ class IonizationChamber0(PSIDeviceBase):
@typechecked
def fill(
self,
gas1: Literal['He', 'N2', 'Ar', 'Kr'],
gas1: Literal["He", "N2", "Ar", "Kr"],
conc1: float,
gas2: Literal['He', 'N2', 'Ar', 'Kr'],
gas2: Literal["He", "N2", "Ar", "Kr"],
conc2: float,
pressure: float,
*,
wait:bool = False,
wait: bool = False,
) -> DeviceStatus:
"""Fill an ionization chamber with the specified gas mixture.
@@ -247,13 +250,13 @@ class IonizationChamber0(PSIDeviceBase):
"""
if 100 < conc1 < 0:
raise ValueError(f'Concentration 1 {conc1} out of range [0 .. 100 %]')
raise ValueError(f"Concentration 1 {conc1} out of range [0 .. 100 %]")
if 100 < conc2 < 0:
raise ValueError(f'Concentration 2 {conc2} out of range [0 .. 100 %]')
if not np.isclose((conc1+conc2), 100, atol=0.1):
raise ValueError(f"Conc1 {conc1} and conc2 {conc2} must sum to 100 +- 0.1")
raise ValueError(f"Concentration 2 {conc2} out of range [0 .. 100 %]")
if not np.isclose((conc1 + conc2), 100, atol=0.1):
raise ValueError(f"Conc1 {conc1} and conc2 {conc2} must sum to 100 +- 0.1")
if 3 < pressure < 0:
raise ValueError(f'Pressure {pressure} out of range [0 .. 3 bar abs]')
raise ValueError(f"Pressure {pressure} out of range [0 .. 3 bar abs]")
self.gmes.gas1_req.set(gas1).wait(timeout=3)
self.gmes.conc1_req.set(conc1).wait(timeout=3)
@@ -261,16 +264,23 @@ class IonizationChamber0(PSIDeviceBase):
self.gmes.conc2_req.set(conc2).wait(timeout=3)
self.gmes.fill.put(1)
def wait_for_status():
return self.gmes.status.get() == 0
timeout = 3
if not self.wait_for_condition(wait_for_status, timeout=timeout, check_stopped=True):
raise TimeoutError(f"Ionization chamber filling process did not start after {timeout}s. Last log message {self.gmes.status_msg.get()}")
raise TimeoutError(
f"Ionization chamber filling process did not start after {timeout}s. Last log message {self.gmes.status_msg.get()}"
)
def wait_for_filling_finished():
return self.gmes.status.get() == 1
# Wait until ionization chamber is filled successfully
status = self.task_handler.submit_task(task=self.wait_for_condition, task_args=(wait_for_filling_finished, 360, True))
status = self.task_handler.submit_task(
task=self.wait_for_condition, task_args=(wait_for_filling_finished, 360, True)
)
if wait:
status.wait()
return status
@@ -279,35 +289,68 @@ class IonizationChamber0(PSIDeviceBase):
class IonizationChamber1(PSIDeviceBase):
"""Ionization Chamber 0, prefix should be 'X01DA-'."""
num=2
num = 2
amp_signals = {
"cOnOff": (EpicsSignal, (f"ES:AMP5004.cOnOff{num}"), {"kind":"config", "doc":f'Enable ch{num} -> IC{num-1}'}),
"cGain_ENUM": (EpicsSignalWithRBV, (f"ES:AMP5004:cGain{num}_ENUM"), {"kind":"config", "doc":f'Gain of ch{num} -> IC{num-1}'}),
"cFilter_ENUM": (EpicsSignalWithRBV, (f"ES:AMP5004:cFilter{num}_ENUM"), {"kind":"config", "doc":f'Filter of ch{num} -> IC{num-1}'})
"cOnOff": (
EpicsSignal,
(f"ES:AMP5004.cOnOff{num}"),
{"kind": "config", "doc": f"Enable ch{num} -> IC{num-1}"},
),
"cGain_ENUM": (
EpicsSignalWithRBV,
(f"ES:AMP5004:cGain{num}_ENUM"),
{"kind": "config", "doc": f"Gain of ch{num} -> IC{num-1}"},
),
"cFilter_ENUM": (
EpicsSignalWithRBV,
(f"ES:AMP5004:cFilter{num}_ENUM"),
{"kind": "config", "doc": f"Filter of ch{num} -> IC{num-1}"},
),
}
amp = Dcpt(amp_signals)
gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}")
hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES2-IC{num-1}:")
hv_en_signals = {
"ext_ena" : (EpicsSignalRO, "ES2-IC12:HV-Ext-Ena", {"kind" : "config", "doc" :'External enable signal of HV'}),
"ena" : (EpicsSignalRO, "ES2-IC12:HV-Ena", {"kind" : "config", "doc" :'Enable signal of HV'}),
"ext_ena": (
EpicsSignalRO,
"ES2-IC12:HV-Ext-Ena",
{"kind": "config", "doc": "External enable signal of HV"},
),
"ena": (EpicsSignalRO, "ES2-IC12:HV-Ena", {"kind": "config", "doc": "Enable signal of HV"}),
}
hv_en = Dcpt(hv_en_signals)
class IonizationChamber2(PSIDeviceBase):
"""Ionization Chamber 0, prefix should be 'X01DA-'."""
num=3
num = 3
amp_signals = {
"cOnOff": (EpicsSignal, (f"ES:AMP5004.cOnOff{num}"), {"kind":"config", "doc":f'Enable ch{num} -> IC{num-1}'}),
"cGain_ENUM": (EpicsSignalWithRBV, (f"ES:AMP5004:cGain{num}_ENUM"), {"kind":"config", "doc":f'Gain of ch{num} -> IC{num-1}'}),
"cFilter_ENUM": (EpicsSignalWithRBV, (f"ES:AMP5004:cFilter{num}_ENUM"), {"kind":"config", "doc":f'Filter of ch{num} -> IC{num-1}'})
"cOnOff": (
EpicsSignal,
(f"ES:AMP5004.cOnOff{num}"),
{"kind": "config", "doc": f"Enable ch{num} -> IC{num-1}"},
),
"cGain_ENUM": (
EpicsSignalWithRBV,
(f"ES:AMP5004:cGain{num}_ENUM"),
{"kind": "config", "doc": f"Gain of ch{num} -> IC{num-1}"},
),
"cFilter_ENUM": (
EpicsSignalWithRBV,
(f"ES:AMP5004:cFilter{num}_ENUM"),
{"kind": "config", "doc": f"Filter of ch{num} -> IC{num-1}"},
),
}
amp = Dcpt(amp_signals)
gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}")
hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES2-IC{num-1}:")
hv_en_signals = {
"ext_ena" : (EpicsSignalRO, "ES2-IC12:HV-Ext-Ena", {"kind" : "config", "doc" :'External enable signal of HV'}),
"ena" : (EpicsSignalRO, "ES2-IC12:HV-Ena", {"kind" : "config", "doc" :'Enable signal of HV'}),
"ext_ena": (
EpicsSignalRO,
"ES2-IC12:HV-Ext-Ena",
{"kind": "config", "doc": "External enable signal of HV"},
),
"ena": (EpicsSignalRO, "ES2-IC12:HV-Ena", {"kind": "config", "doc": "Enable signal of HV"}),
}
hv_en = Dcpt(hv_en_signals)
hv_en = Dcpt(hv_en_signals)

View File

@@ -1,5 +1,6 @@
import enum
class AmplifierEnable(int, enum.Enum):
"""Enum class for the enable signal of the channel"""
@@ -7,6 +8,7 @@ class AmplifierEnable(int, enum.Enum):
STARTUP = 1
ON = 2
class AmplifierGain(int, enum.Enum):
"""Enum class for the gain of the channel"""
@@ -16,14 +18,15 @@ class AmplifierGain(int, enum.Enum):
G1E8 = 3
G1E9 = 4
class AmplifierFilter(int, enum.Enum):
"""Enum class for the filter of the channel"""
F1US = 0
F3US = 1
F10US = 2
F30US = 3
F1US = 0
F3US = 1
F10US = 2
F30US = 3
F100US = 4
F300US = 5
F1MS = 6
F3MS = 7
F1MS = 6
F3MS = 7

View File

@@ -323,7 +323,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
while time.time() - start_time < timeout:
if signal.get() == value:
return None
if self.stopped is True: # Should this check be optional or configurable?!
if self.stopped is True: # Should this check be optional or configurable?!
raise DeviceStopError(f"Device {self.name} was stopped while waiting for signal")
time.sleep(0.1)
# If we end up here, the status did not resolve
@@ -445,7 +445,6 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
def _check_scan_msg(self, target_state: ScanControlLoadMessage) -> None:
"""Check if the scan message is gettting available

View File

@@ -1,31 +1,27 @@
""" Module for additional utils of the Mo1 Bragg Positioner"""
"""Module for additional utils of the Mo1 Bragg Positioner"""
import numpy as np
from scipy.interpolate import BSpline
################ Define Constants ############
SAFETY_FACTOR = 0.025 # safety factor to limit acceleration -> NEVER SET TO ZERO !
N_SAMPLES = 41 # number of samples to generate -> Always choose uneven number,
# otherwise peak value will not be included
DEGREE_SPLINE = 3 # DEGREE_SPLINE of spline, 3 works good
TIME_COMPENSATE_SPLINE = 0.0062 # time to be compensated each spline in s
POSITION_COMPONSATION = 0.02 # angle to add at both limits, must be same values
# as used on ACS controller for simple scans
SAFETY_FACTOR = 0.025 # safety factor to limit acceleration -> NEVER SET TO ZERO !
N_SAMPLES = 41 # number of samples to generate -> Always choose uneven number,
# otherwise peak value will not be included
DEGREE_SPLINE = 3 # DEGREE_SPLINE of spline, 3 works good
TIME_COMPENSATE_SPLINE = 0.0062 # time to be compensated each spline in s
POSITION_COMPONSATION = 0.02 # angle to add at both limits, must be same values
# as used on ACS controller for simple scans
class Mo1UtilsSplineError(Exception):
""" Exception for spline computation"""
"""Exception for spline computation"""
def compute_spline(
low_deg:float,
high_deg:float,
p_kink:float,
e_kink_deg:float,
scan_time:float,
low_deg: float, high_deg: float, p_kink: float, e_kink_deg: float, scan_time: float
) -> tuple[float, float, float]:
""" Spline computation for the advanced scan mode
"""Spline computation for the advanced scan mode
Args:
low_deg (float): Low angle value of the scan in deg
high_deg (float): High angle value of the scan in deg
@@ -42,53 +38,56 @@ def compute_spline(
high_deg = high_deg + POSITION_COMPONSATION
if p_kink < 0 or p_kink > 100:
raise Mo1UtilsSplineError("Kink position not within range of [0..100%]"+
f"for p_kink: {p_kink}")
raise Mo1UtilsSplineError(
"Kink position not within range of [0..100%]" + f"for p_kink: {p_kink}"
)
if e_kink_deg < low_deg or e_kink_deg > high_deg:
raise Mo1UtilsSplineError("Kink energy not within selected energy range of scan,"+
f"for e_kink_deg {e_kink_deg}, low_deg {low_deg} and"+
f"high_deg {high_deg}.")
raise Mo1UtilsSplineError(
"Kink energy not within selected energy range of scan,"
+ f"for e_kink_deg {e_kink_deg}, low_deg {low_deg} and"
+ f"high_deg {high_deg}."
)
tc1 = SAFETY_FACTOR / scan_time * TIME_COMPENSATE_SPLINE
t_kink = (scan_time - TIME_COMPENSATE_SPLINE - 2*(SAFETY_FACTOR - tc1)) * p_kink/100 + (SAFETY_FACTOR - tc1)
t_kink = (scan_time - TIME_COMPENSATE_SPLINE - 2 * (SAFETY_FACTOR - tc1)) * p_kink / 100 + (
SAFETY_FACTOR - tc1
)
t_input = [0,
SAFETY_FACTOR - tc1,
t_kink,
scan_time - TIME_COMPENSATE_SPLINE - SAFETY_FACTOR + tc1,
scan_time - TIME_COMPENSATE_SPLINE]
p_input = [0,
0,
e_kink_deg - low_deg,
high_deg - low_deg,
high_deg - low_deg]
t_input = [
0,
SAFETY_FACTOR - tc1,
t_kink,
scan_time - TIME_COMPENSATE_SPLINE - SAFETY_FACTOR + tc1,
scan_time - TIME_COMPENSATE_SPLINE,
]
p_input = [0, 0, e_kink_deg - low_deg, high_deg - low_deg, high_deg - low_deg]
cv = np.stack((t_input, p_input)).T # spline coefficients
cv = np.stack((t_input, p_input)).T # spline coefficients
max_param = len(cv) - DEGREE_SPLINE
kv = np.clip(np.arange(len(cv)+DEGREE_SPLINE+1)-DEGREE_SPLINE,0,max_param) # knots
spl = BSpline(kv, cv, DEGREE_SPLINE) # get spline function
p = spl(np.linspace(0,max_param,N_SAMPLES))
v = spl(np.linspace(0,max_param,N_SAMPLES), 1)
a = spl(np.linspace(0,max_param,N_SAMPLES), 2)
j = spl(np.linspace(0,max_param,N_SAMPLES), 3)
kv = np.clip(np.arange(len(cv) + DEGREE_SPLINE + 1) - DEGREE_SPLINE, 0, max_param) # knots
spl = BSpline(kv, cv, DEGREE_SPLINE) # get spline function
p = spl(np.linspace(0, max_param, N_SAMPLES))
v = spl(np.linspace(0, max_param, N_SAMPLES), 1)
a = spl(np.linspace(0, max_param, N_SAMPLES), 2)
j = spl(np.linspace(0, max_param, N_SAMPLES), 3)
tim, pos = p.T
pos = pos + low_deg
vel = v[:,1]/v[:,0]
vel = v[:, 1] / v[:, 0]
acc = []
for item in a:
acc.append(0) if item[1] == 0 else acc.append(item[1]/item[0])
acc.append(0) if item[1] == 0 else acc.append(item[1] / item[0])
jerk = []
for item in j:
jerk.append(0) if item[1] == 0 else jerk.append(item[1]/item[0])
jerk.append(0) if item[1] == 0 else jerk.append(item[1] / item[0])
dt = np.zeros(len(tim))
for i in np.arange(len(tim)):
if i == 0:
dt[i] = 0
else:
dt[i] = 1000*(tim[i]-tim[i-1])
dt[i] = 1000 * (tim[i] - tim[i - 1])
return pos, vel, dt

View File

@@ -1,35 +1,27 @@
""" ES2 Pilatus Curtain"""
"""ES2 Pilatus Curtain"""
import time
from ophyd import Component as Cpt
from ophyd import Device, Kind, EpicsSignal, EpicsSignalRO
from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind
from ophyd_devices.utils import bec_utils
class GasMixSetup(Device):
"""Class for the ES2 Pilatus Curtain"""
USER_ACCESS = ['open', 'close']
USER_ACCESS = ["open", "close"]
open_cover = Cpt(
EpicsSignal, suffix="OpenCover", kind="config", doc='Open Cover'
)
close_cover = Cpt(
EpicsSignal, suffix="CloseCover", kind="config", doc='Close Cover'
)
open_cover = Cpt(EpicsSignal, suffix="OpenCover", kind="config", doc="Open Cover")
close_cover = Cpt(EpicsSignal, suffix="CloseCover", kind="config", doc="Close Cover")
cover_is_closed = Cpt(
EpicsSignalRO, suffix="CoverIsClosed", kind="config", doc='Cover is closed'
)
cover_is_open = Cpt(
EpicsSignalRO, suffix="CoverIsOpen", kind="config", doc='Cover is open'
EpicsSignalRO, suffix="CoverIsClosed", kind="config", doc="Cover is closed"
)
cover_is_open = Cpt(EpicsSignalRO, suffix="CoverIsOpen", kind="config", doc="Cover is open")
cover_is_moving = Cpt(
EpicsSignalRO, suffix="CoverIsMoving", kind="config", doc='Cover is moving'
EpicsSignalRO, suffix="CoverIsMoving", kind="config", doc="Cover is moving"
)
cover_error = Cpt(
EpicsSignalRO, suffix="CoverError", kind="config", doc='Cover error'
)
cover_error = Cpt(EpicsSignalRO, suffix="CoverError", kind="config", doc="Cover error")
def __init__(
self, prefix="", *, name: str, kind: Kind = None, device_manager=None, parent=None, **kwargs
@@ -68,7 +60,7 @@ class GasMixSetup(Device):
while not self.cover_is_open.get():
time.sleep(0.1)
if self.cover_error.get():
raise TimeoutError('Curtain did not open successfully and is now in an error state')
raise TimeoutError("Curtain did not open successfully and is now in an error state")
def close(self) -> None:
"""Close the cover"""
@@ -78,4 +70,6 @@ class GasMixSetup(Device):
while not self.cover_is_closed.get():
time.sleep(0.1)
if self.cover_error.get():
raise TimeoutError('Curtain did not close successfully and is now in an error state')
raise TimeoutError(
"Curtain did not close successfully and is now in an error state"
)

View File

@@ -2,6 +2,7 @@
import time
from typing import Literal
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
@@ -59,7 +60,7 @@ class XASSimpleScan(AsyncFlyScanBase):
def update_readout_priority(self):
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
super().update_readout_priority()
self.readout_priority['async'].append('nidaq')
self.readout_priority["async"].append("nidaq")
def prepare_positions(self):
"""Prepare the positions for the scan.