From d3ab7316ac6917255c3d463f7996d79db1c9fbb0 Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 18 Mar 2025 19:26:31 +0100 Subject: [PATCH] fix: formatting --- debye_bec/devices/cameras/basler_cam.py | 23 +- debye_bec/devices/cameras/prosilica_cam.py | 21 +- debye_bec/devices/es0filter.py | 55 ++-- .../ionization_chambers/ionization_chamber.py | 293 ++++++++++-------- .../ionization_chamber_enums.py | 15 +- debye_bec/devices/mo1_bragg/mo1_bragg.py | 3 +- .../devices/mo1_bragg/mo1_bragg_utils.py | 85 +++-- debye_bec/devices/pilatus_curtain.py | 34 +- debye_bec/scans/mono_bragg_scans.py | 3 +- 9 files changed, 284 insertions(+), 248 deletions(-) diff --git a/debye_bec/devices/cameras/basler_cam.py b/debye_bec/devices/cameras/basler_cam.py index c526e8d..d5699b3 100644 --- a/debye_bec/devices/cameras/basler_cam.py +++ b/debye_bec/devices/cameras/basler_cam.py @@ -1,32 +1,29 @@ - - - -from ophyd_devices.devices.areadetector.cam import AravisDetectorCam -from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 -from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase -from ophyd import ADComponent as ADCpt -from ophyd import ADBase, Kind # from ophyd_devices.sim.sim_signals import SetableSignal # from ophyd_devices.utils.psi_component import PSIComponent, SignalType import numpy as np - +from ophyd import ADBase +from ophyd import ADComponent as ADCpt +from ophyd import Kind +from ophyd_devices.devices.areadetector.cam import AravisDetectorCam +from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase class BaslerCamBase(ADBase): cam1 = ADCpt(AravisDetectorCam, "cam1:") - image1 = ADCpt(ImagePlugin_V35, 'image1:') + image1 = ADCpt(ImagePlugin_V35, "image1:") + class BaslerCam(PSIDeviceBase, BaslerCamBase): # preview_2d = PSIComponent(SetableSignal, signal_type=SignalType.PREVIEW, ndim=2, kind=Kind.omitted) - def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs): width = self.image1.array_size.width.get() height = self.image1.array_size.height.get() - data = np.reshape(value, (height,width)) + data = np.reshape(value, (height, width)) # self.preview_2d.put(data) self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data) def on_connected(self): - self.image1.array_data.subscribe(self.emit_to_bec, run=False) \ No newline at end of file + self.image1.array_data.subscribe(self.emit_to_bec, run=False) diff --git a/debye_bec/devices/cameras/prosilica_cam.py b/debye_bec/devices/cameras/prosilica_cam.py index 0582eb0..9d9118d 100644 --- a/debye_bec/devices/cameras/prosilica_cam.py +++ b/debye_bec/devices/cameras/prosilica_cam.py @@ -1,26 +1,25 @@ - - - +import numpy as np +from ophyd import ADBase +from ophyd import ADComponent as ADCpt +from ophyd import Component as Cpt +from ophyd import Device from ophyd_devices.devices.areadetector.cam import ProsilicaDetectorCam from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase -from ophyd import Component as Cpt -from ophyd import ADComponent as ADCpt -from ophyd import Device, ADBase -import numpy as np class ProsilicaCamBase(ADBase): cam1 = ADCpt(ProsilicaDetectorCam, "cam1:") - image1 = ADCpt(ImagePlugin_V35, 'image1:') + image1 = ADCpt(ImagePlugin_V35, "image1:") + class ProsilicaCam(PSIDeviceBase, ProsilicaCamBase): - + def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs): width = self.image1.array_size.width.get() height = self.image1.array_size.height.get() - data = np.reshape(value, (height,width)) + data = np.reshape(value, (height, width)) self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data) def on_connected(self): - self.image1.array_data.subscribe(self.emit_to_bec, run=False) \ No newline at end of file + self.image1.array_data.subscribe(self.emit_to_bec, run=False) diff --git a/debye_bec/devices/es0filter.py b/debye_bec/devices/es0filter.py index 9c08405..9ccdfb4 100644 --- a/debye_bec/devices/es0filter.py +++ b/debye_bec/devices/es0filter.py @@ -1,34 +1,35 @@ -""" ES0 Filter Station""" +"""ES0 Filter Station""" + +from typing import Literal from ophyd import Component as Cpt -from ophyd import Device, Kind, EpicsSignal -from typing import Literal +from ophyd import Device, EpicsSignal, Kind +from ophyd_devices.utils import bec_utils from typeguard import typechecked -from ophyd_devices.utils import bec_utils class EpicsSignalWithRBVBit(EpicsSignal): - def __init__(self, prefix, *, bit:int, **kwargs): + def __init__(self, prefix, *, bit: int, **kwargs): super().__init__(prefix, **kwargs) - self.bit = bit + self.bit = bit @typechecked - def put(self, value:Literal[0,1], **kwargs): + def put(self, value: Literal[0, 1], **kwargs): bit_value = super().get() - #convert to int + # convert to int bit_value = int(bit_value) - if value ==1: + if value == 1: new_value = bit_value | (1 << self.bit) else: new_value = bit_value & ~(1 << self.bit) super().put(new_value, **kwargs) - def get(self, **kwargs) -> Literal[0,1]: + def get(self, **kwargs) -> Literal[0, 1]: bit_value = super().get() - #convert to int + # convert to int bit_value = int(bit_value) - if (bit_value & (1 << self.bit)) ==0: + if (bit_value & (1 << self.bit)) == 0: return 0 return 1 @@ -36,18 +37,18 @@ class EpicsSignalWithRBVBit(EpicsSignal): class ES0Filter(Device): """Class for the ES0 filter station X01DA-ES0-FI:""" - Mo400 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=1,kind="config",doc='Mo400 filter') - Mo300 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=2,kind="config",doc='Mo300 filter') - Mo200 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=3,kind="config",doc='Mo200 filter') - Zn500 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=4,kind="config",doc='Zn500 filter') - Zn250 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=5,kind="config",doc='Zn250 filter') - Zn125 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=6,kind="config",doc='Zn125 filter') - Zn50 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=7,kind="config",doc='Zn50 filter') - Zn25 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=8,kind="config",doc='Zn25 filter') - Al500 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=9,kind="config",doc='Al500 filter') - Al320 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=10,kind="config",doc='Al320 filter') - Al200 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=11,kind="config",doc='Al200 filter') - Al100 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=12,kind="config",doc='Al100 filter') - Al50 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=13,kind="config",doc='Al50 filter') - Al20 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=14,kind="config",doc='Al20 filter') - Al10 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=15,kind="config",doc='Al10 filter') \ No newline at end of file + Mo400 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=1, kind="config", doc="Mo400 filter") + Mo300 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=2, kind="config", doc="Mo300 filter") + Mo200 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=3, kind="config", doc="Mo200 filter") + Zn500 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=4, kind="config", doc="Zn500 filter") + Zn250 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=5, kind="config", doc="Zn250 filter") + Zn125 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=6, kind="config", doc="Zn125 filter") + Zn50 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=7, kind="config", doc="Zn50 filter") + Zn25 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=8, kind="config", doc="Zn25 filter") + Al500 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=9, kind="config", doc="Al500 filter") + Al320 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=10, kind="config", doc="Al320 filter") + Al200 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=11, kind="config", doc="Al200 filter") + Al100 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=12, kind="config", doc="Al100 filter") + Al50 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=13, kind="config", doc="Al50 filter") + Al20 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=14, kind="config", doc="Al20 filter") + Al10 = Cpt(EpicsSignalWithRBVBit, suffix="BIO", bit=15, kind="config", doc="Al10 filter") diff --git a/debye_bec/devices/ionization_chambers/ionization_chamber.py b/debye_bec/devices/ionization_chambers/ionization_chamber.py index 6492839..6c2b9e0 100644 --- a/debye_bec/devices/ionization_chambers/ionization_chamber.py +++ b/debye_bec/devices/ionization_chambers/ionization_chamber.py @@ -1,106 +1,101 @@ -from ophyd import Device,Kind,Component as Cpt, DynamicDeviceComponent as Dcpt -from ophyd import EpicsSignalWithRBV, EpicsSignal, EpicsSignalRO -from ophyd.status import SubscriptionStatus, DeviceStatus -from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from typing import Literal -from typeguard import typechecked -import numpy as np -from debye_bec.devices.ionization_chambers.ionization_chamber_enums import AmplifierEnable, AmplifierGain, AmplifierFilter +import numpy as np +from ophyd import Component as Cpt +from ophyd import Device +from ophyd import DynamicDeviceComponent as Dcpt +from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Kind +from ophyd.status import DeviceStatus, SubscriptionStatus +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase +from typeguard import typechecked + +from debye_bec.devices.ionization_chambers.ionization_chamber_enums import ( + AmplifierEnable, + AmplifierFilter, + AmplifierGain, +) + class EpicsSignalSplit(EpicsSignal): - """ Wrapper around EpicsSignal with different read and write pv""" - + """Wrapper around EpicsSignal with different read and write pv""" + def __init__(self, prefix, **kwargs): - super().__init__(prefix + "-RB", write_pv=prefix+"Set", **kwargs) + super().__init__(prefix + "-RB", write_pv=prefix + "Set", **kwargs) + class GasMixSetupControl(Device): """GasMixSetup Control for Inonization Chamber 0""" - gas1_req = Cpt( - EpicsSignalWithRBV, suffix="Gas1Req", kind="config", doc='Gas 1 requirement' - ) + gas1_req = Cpt(EpicsSignalWithRBV, suffix="Gas1Req", kind="config", doc="Gas 1 requirement") conc1_req = Cpt( - EpicsSignalWithRBV, suffix="Conc1Req", kind="config", doc='Concentration 1 requirement' - ) - gas2_req = Cpt( - EpicsSignalWithRBV, suffix="Gas2Req", kind="config", doc='Gas 2 requirement' + EpicsSignalWithRBV, suffix="Conc1Req", kind="config", doc="Concentration 1 requirement" ) + gas2_req = Cpt(EpicsSignalWithRBV, suffix="Gas2Req", kind="config", doc="Gas 2 requirement") conc2_req = Cpt( - EpicsSignalWithRBV, suffix="Conc2Req", kind="config", doc='Concentration 2 requirement' + EpicsSignalWithRBV, suffix="Conc2Req", kind="config", doc="Concentration 2 requirement" ) press_req = Cpt( - EpicsSignalWithRBV, suffix="PressReq", kind="config", doc='Pressure requirement' - ) - fill = Cpt( - EpicsSignal, suffix="Fill", kind="config", doc='Fill the chamber' - ) - status = Cpt( - EpicsSignalRO, suffix="Status", kind="config", doc='Status' - ) - gas1 = Cpt( - EpicsSignalRO, suffix="Gas1", kind="config", doc='Gas 1' - ) - conc1 = Cpt( - EpicsSignalRO, suffix="Conc1", kind="config", doc='Concentration 1' - ) - gas2 = Cpt( - EpicsSignalRO, suffix="Gas2", kind="config", doc='Gas 2' - ) - conc2 = Cpt( - EpicsSignalRO, suffix="Conc2", kind="config", doc='Concentration 2' - ) - press = Cpt( - EpicsSignalRO, suffix="PressTransm", kind="config", doc='Current Pressure' - ) - status_msg = Cpt( - EpicsSignalRO, suffix="StatusMsg0", kind="config", doc='Status' + EpicsSignalWithRBV, suffix="PressReq", kind="config", doc="Pressure requirement" ) + fill = Cpt(EpicsSignal, suffix="Fill", kind="config", doc="Fill the chamber") + status = Cpt(EpicsSignalRO, suffix="Status", kind="config", doc="Status") + gas1 = Cpt(EpicsSignalRO, suffix="Gas1", kind="config", doc="Gas 1") + conc1 = Cpt(EpicsSignalRO, suffix="Conc1", kind="config", doc="Concentration 1") + gas2 = Cpt(EpicsSignalRO, suffix="Gas2", kind="config", doc="Gas 2") + conc2 = Cpt(EpicsSignalRO, suffix="Conc2", kind="config", doc="Concentration 2") + press = Cpt(EpicsSignalRO, suffix="PressTransm", kind="config", doc="Current Pressure") + status_msg = Cpt(EpicsSignalRO, suffix="StatusMsg0", kind="config", doc="Status") class HighVoltageSuppliesControl(Device): - """ HighVoltage Supplies Control for Ionization Chamber 0""" + """HighVoltage Supplies Control for Ionization Chamber 0""" + + hv_v = Cpt(EpicsSignalSplit, suffix="HV1-V", kind="config", doc="HV voltage") + hv_i = Cpt(EpicsSignalSplit, suffix="HV1-I", kind="config", doc="HV current") + grid_v = Cpt(EpicsSignalSplit, suffix="HV2-V", kind="config", doc="Grid voltage") + grid_i = Cpt(EpicsSignalSplit, suffix="HV2-I", kind="config", doc="Grid current") - hv_v = Cpt( - EpicsSignalSplit, suffix="HV1-V", kind="config", doc='HV voltage' - ) - hv_i = Cpt( - EpicsSignalSplit, suffix="HV1-I", kind="config", doc='HV current' - ) - grid_v = Cpt( - EpicsSignalSplit, suffix="HV2-V", kind="config", doc='Grid voltage' - ) - grid_i = Cpt( - EpicsSignalSplit, suffix="HV2-I", kind="config", doc='Grid current' - ) class IonizationChamber0(PSIDeviceBase): """Ionization Chamber 0, prefix should be 'X01DA-'.""" - num=1 + num = 1 amp_signals = { - "cOnOff": (EpicsSignal, (f"ES:AMP5004.cOnOff{num}"), {"kind":"config", "doc":f'Enable ch{num} -> IC{num-1}'}), - "cGain_ENUM": (EpicsSignalWithRBV, (f"ES:AMP5004:cGain{num}_ENUM"), {"kind":"config", "doc":f'Gain of ch{num} -> IC{num-1}'}), - "cFilter_ENUM": (EpicsSignalWithRBV, (f"ES:AMP5004:cFilter{num}_ENUM"), {"kind":"config", "doc":f'Filter of ch{num} -> IC{num-1}'}) + "cOnOff": ( + EpicsSignal, + (f"ES:AMP5004.cOnOff{num}"), + {"kind": "config", "doc": f"Enable ch{num} -> IC{num-1}"}, + ), + "cGain_ENUM": ( + EpicsSignalWithRBV, + (f"ES:AMP5004:cGain{num}_ENUM"), + {"kind": "config", "doc": f"Gain of ch{num} -> IC{num-1}"}, + ), + "cFilter_ENUM": ( + EpicsSignalWithRBV, + (f"ES:AMP5004:cFilter{num}_ENUM"), + {"kind": "config", "doc": f"Filter of ch{num} -> IC{num-1}"}, + ), } amp = Dcpt(amp_signals) gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}") hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES1-IC{num-1}:") hv_en_signals = { - "ext_ena" : (EpicsSignalRO, "ES1-IC0:HV-Ext-Ena", {"kind" : "config", "doc" :'External enable signal of HV'}), - "ena" : (EpicsSignalRO, "ES1-IC0:HV-Ena", {"kind" : "config", "doc" :'Enable signal of HV'}), + "ext_ena": ( + EpicsSignalRO, + "ES1-IC0:HV-Ext-Ena", + {"kind": "config", "doc": "External enable signal of HV"}, + ), + "ena": (EpicsSignalRO, "ES1-IC0:HV-Ena", {"kind": "config", "doc": "Enable signal of HV"}), } hv_en = Dcpt(hv_en_signals) - def __init__(self, name:str, scan_info = None, **kwargs): + def __init__(self, name: str, scan_info=None, **kwargs): self.timeout_for_pvwait = 2.5 super().__init__(name=name, scan_info=scan_info, **kwargs) @typechecked - def set_gain( - self, - gain: Literal['1e6', '1e7', '5e7', '1e8', '1e9'] | AmplifierGain, - ) -> None: + def set_gain(self, gain: Literal["1e6", "1e7", "5e7", "1e8", "1e9"] | AmplifierGain) -> None: """Configure the gain setting of the specified channel Args: @@ -109,29 +104,35 @@ class IonizationChamber0(PSIDeviceBase): if self.amp.cOnOff.get() == AmplifierEnable.OFF: self.amp.cOnOff.put(AmplifierEnable.ON) + # Wait until channel is switched on def _wait_enabled(): return self.amp.cOnOff.get() == AmplifierEnable.ON - if not self.wait_for_condition(_wait_enabled, check_stopped=True, timeout = self.timeout_for_pvwait): + + if not self.wait_for_condition( + _wait_enabled, check_stopped=True, timeout=self.timeout_for_pvwait + ): raise TimeoutError( f"Enabling channel run into timeout after {self.timeout_for_pvwait} seconds" ) - + match gain: - case '1e6': + case "1e6": self.amp.cGain_ENUM.put(AmplifierGain.G1E6) - case '1e7': + case "1e7": self.amp.cGain_ENUM.put(AmplifierGain.G1E7) - case '5e7': + case "5e7": self.amp.cGain_ENUM.put(AmplifierGain.G5E7) - case '1e8': + case "1e8": self.amp.cGain_ENUM.put(AmplifierGain.G1E8) - case '1e9': + case "1e9": self.amp.cGain_ENUM.put(AmplifierGain.G1E9) def set_filter( - self, - value: Literal['1us', '3us', '10us', '30us', '100us', '300us', '1ms', '3ms'] | AmplifierFilter, + self, + value: ( + Literal["1us", "3us", "10us", "30us", "100us", "300us", "1ms", "3ms"] | AmplifierFilter + ), ) -> None: """Configure the filter setting of the specified channel @@ -140,37 +141,38 @@ class IonizationChamber0(PSIDeviceBase): """ if self.amp.cOnOff.get() == AmplifierEnable.OFF: self.amp.cOnOff.put(AmplifierEnable.ON) + # Wait until channel is switched on def _wait_enabled(): return self.amp.cOnOff.get() == AmplifierEnable.ON - if not self.wait_for_condition(_wait_enabled, check_stopped=True, timeout = self.timeout_for_pvwait): + + if not self.wait_for_condition( + _wait_enabled, check_stopped=True, timeout=self.timeout_for_pvwait + ): raise TimeoutError( f"Enabling channel run into timeout after {self.timeout_for_pvwait} seconds" ) match value: - case '1us': + case "1us": self.amp.cFilter_ENUM.put(AmplifierFilter.F1US) - case '3us': + case "3us": self.amp.cFilter_ENUM.put(AmplifierFilter.F3US) - case '10us': + case "10us": self.amp.cFilter_ENUM.put(AmplifierFilter.F10US) - case '30us': + case "30us": self.amp.cFilter_ENUM.put(AmplifierFilter.F30US) - case '100us': + case "100us": self.amp.cFilter_ENUM.put(AmplifierFilter.F100US) - case '300us': + case "300us": self.amp.cFilter_ENUM.put(AmplifierFilter.F300US) - case '1ms': + case "1ms": self.amp.cFilter_ENUM.put(AmplifierFilter.F1MS) - case '3ms': + case "3ms": self.amp.cFilter_ENUM.put(AmplifierFilter.F3MS) @typechecked - def set_hv( - self, - hv: float, - ) -> None: + def set_hv(self, hv: float) -> None: """Configure the high voltage settings , this will enable the high voltage (if external enable is active)! @@ -179,13 +181,15 @@ class IonizationChamber0(PSIDeviceBase): """ if not 0 < hv < 3000: - raise ValueError(f'specified HV {hv} not within range [0 .. 3000]') + raise ValueError(f"specified HV {hv} not within range [0 .. 3000]") if self.hv.grid_v.get() > hv: - raise ValueError(f'Grid {self.hv.grid_v.get()} must not be higher than HV {hv}!') - - if not self.hv_en.ena.get() == 1 : + raise ValueError(f"Grid {self.hv.grid_v.get()} must not be higher than HV {hv}!") + + if not self.hv_en.ena.get() == 1: + def check_ch_ena(*, old_value, value, **kwargs): return value == 1 + status = SubscriptionStatus(device=self.hv_en.ena, callback=check_ch_ena) self.hv_en.ena.put(1) # Wait after setting ena to 1 @@ -196,10 +200,7 @@ class IonizationChamber0(PSIDeviceBase): self.hv.hv_v.put(hv) @typechecked - def set_grid( - self, - grid: float, - ) -> None: + def set_grid(self, grid: float) -> None: """Configure the high voltage settings , this will enable the high voltage (if external enable is active)! @@ -208,13 +209,15 @@ class IonizationChamber0(PSIDeviceBase): """ if not 0 < grid < 3000: - raise ValueError(f'specified Grid {grid} not within range [0 .. 3000]') + raise ValueError(f"specified Grid {grid} not within range [0 .. 3000]") if grid > self.hv.hv_v.get(): - raise ValueError(f'Grid {grid} must not be higher than HV {self.hv.hv_v.get()}!') - - if not self.hv_en.ena.get() == 1 : + raise ValueError(f"Grid {grid} must not be higher than HV {self.hv.hv_v.get()}!") + + if not self.hv_en.ena.get() == 1: + def check_ch_ena(*, old_value, value, **kwargs): return value == 1 + status = SubscriptionStatus(device=self.hv_en.ena, callback=check_ch_ena) self.hv_en.ena.put(1) # Wait after setting ena to 1 @@ -227,13 +230,13 @@ class IonizationChamber0(PSIDeviceBase): @typechecked def fill( self, - gas1: Literal['He', 'N2', 'Ar', 'Kr'], + gas1: Literal["He", "N2", "Ar", "Kr"], conc1: float, - gas2: Literal['He', 'N2', 'Ar', 'Kr'], + gas2: Literal["He", "N2", "Ar", "Kr"], conc2: float, pressure: float, *, - wait:bool = False, + wait: bool = False, ) -> DeviceStatus: """Fill an ionization chamber with the specified gas mixture. @@ -247,13 +250,13 @@ class IonizationChamber0(PSIDeviceBase): """ if 100 < conc1 < 0: - raise ValueError(f'Concentration 1 {conc1} out of range [0 .. 100 %]') + raise ValueError(f"Concentration 1 {conc1} out of range [0 .. 100 %]") if 100 < conc2 < 0: - raise ValueError(f'Concentration 2 {conc2} out of range [0 .. 100 %]') - if not np.isclose((conc1+conc2), 100, atol=0.1): - raise ValueError(f"Conc1 {conc1} and conc2 {conc2} must sum to 100 +- 0.1") + raise ValueError(f"Concentration 2 {conc2} out of range [0 .. 100 %]") + if not np.isclose((conc1 + conc2), 100, atol=0.1): + raise ValueError(f"Conc1 {conc1} and conc2 {conc2} must sum to 100 +- 0.1") if 3 < pressure < 0: - raise ValueError(f'Pressure {pressure} out of range [0 .. 3 bar abs]') + raise ValueError(f"Pressure {pressure} out of range [0 .. 3 bar abs]") self.gmes.gas1_req.set(gas1).wait(timeout=3) self.gmes.conc1_req.set(conc1).wait(timeout=3) @@ -261,16 +264,23 @@ class IonizationChamber0(PSIDeviceBase): self.gmes.conc2_req.set(conc2).wait(timeout=3) self.gmes.fill.put(1) + def wait_for_status(): return self.gmes.status.get() == 0 + timeout = 3 if not self.wait_for_condition(wait_for_status, timeout=timeout, check_stopped=True): - raise TimeoutError(f"Ionization chamber filling process did not start after {timeout}s. Last log message {self.gmes.status_msg.get()}") + raise TimeoutError( + f"Ionization chamber filling process did not start after {timeout}s. Last log message {self.gmes.status_msg.get()}" + ) + def wait_for_filling_finished(): return self.gmes.status.get() == 1 - + # Wait until ionization chamber is filled successfully - status = self.task_handler.submit_task(task=self.wait_for_condition, task_args=(wait_for_filling_finished, 360, True)) + status = self.task_handler.submit_task( + task=self.wait_for_condition, task_args=(wait_for_filling_finished, 360, True) + ) if wait: status.wait() return status @@ -279,35 +289,68 @@ class IonizationChamber0(PSIDeviceBase): class IonizationChamber1(PSIDeviceBase): """Ionization Chamber 0, prefix should be 'X01DA-'.""" - num=2 + num = 2 amp_signals = { - "cOnOff": (EpicsSignal, (f"ES:AMP5004.cOnOff{num}"), {"kind":"config", "doc":f'Enable ch{num} -> IC{num-1}'}), - "cGain_ENUM": (EpicsSignalWithRBV, (f"ES:AMP5004:cGain{num}_ENUM"), {"kind":"config", "doc":f'Gain of ch{num} -> IC{num-1}'}), - "cFilter_ENUM": (EpicsSignalWithRBV, (f"ES:AMP5004:cFilter{num}_ENUM"), {"kind":"config", "doc":f'Filter of ch{num} -> IC{num-1}'}) + "cOnOff": ( + EpicsSignal, + (f"ES:AMP5004.cOnOff{num}"), + {"kind": "config", "doc": f"Enable ch{num} -> IC{num-1}"}, + ), + "cGain_ENUM": ( + EpicsSignalWithRBV, + (f"ES:AMP5004:cGain{num}_ENUM"), + {"kind": "config", "doc": f"Gain of ch{num} -> IC{num-1}"}, + ), + "cFilter_ENUM": ( + EpicsSignalWithRBV, + (f"ES:AMP5004:cFilter{num}_ENUM"), + {"kind": "config", "doc": f"Filter of ch{num} -> IC{num-1}"}, + ), } amp = Dcpt(amp_signals) gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}") hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES2-IC{num-1}:") hv_en_signals = { - "ext_ena" : (EpicsSignalRO, "ES2-IC12:HV-Ext-Ena", {"kind" : "config", "doc" :'External enable signal of HV'}), - "ena" : (EpicsSignalRO, "ES2-IC12:HV-Ena", {"kind" : "config", "doc" :'Enable signal of HV'}), + "ext_ena": ( + EpicsSignalRO, + "ES2-IC12:HV-Ext-Ena", + {"kind": "config", "doc": "External enable signal of HV"}, + ), + "ena": (EpicsSignalRO, "ES2-IC12:HV-Ena", {"kind": "config", "doc": "Enable signal of HV"}), } hv_en = Dcpt(hv_en_signals) + class IonizationChamber2(PSIDeviceBase): """Ionization Chamber 0, prefix should be 'X01DA-'.""" - num=3 + num = 3 amp_signals = { - "cOnOff": (EpicsSignal, (f"ES:AMP5004.cOnOff{num}"), {"kind":"config", "doc":f'Enable ch{num} -> IC{num-1}'}), - "cGain_ENUM": (EpicsSignalWithRBV, (f"ES:AMP5004:cGain{num}_ENUM"), {"kind":"config", "doc":f'Gain of ch{num} -> IC{num-1}'}), - "cFilter_ENUM": (EpicsSignalWithRBV, (f"ES:AMP5004:cFilter{num}_ENUM"), {"kind":"config", "doc":f'Filter of ch{num} -> IC{num-1}'}) + "cOnOff": ( + EpicsSignal, + (f"ES:AMP5004.cOnOff{num}"), + {"kind": "config", "doc": f"Enable ch{num} -> IC{num-1}"}, + ), + "cGain_ENUM": ( + EpicsSignalWithRBV, + (f"ES:AMP5004:cGain{num}_ENUM"), + {"kind": "config", "doc": f"Gain of ch{num} -> IC{num-1}"}, + ), + "cFilter_ENUM": ( + EpicsSignalWithRBV, + (f"ES:AMP5004:cFilter{num}_ENUM"), + {"kind": "config", "doc": f"Filter of ch{num} -> IC{num-1}"}, + ), } amp = Dcpt(amp_signals) gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}") hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES2-IC{num-1}:") hv_en_signals = { - "ext_ena" : (EpicsSignalRO, "ES2-IC12:HV-Ext-Ena", {"kind" : "config", "doc" :'External enable signal of HV'}), - "ena" : (EpicsSignalRO, "ES2-IC12:HV-Ena", {"kind" : "config", "doc" :'Enable signal of HV'}), + "ext_ena": ( + EpicsSignalRO, + "ES2-IC12:HV-Ext-Ena", + {"kind": "config", "doc": "External enable signal of HV"}, + ), + "ena": (EpicsSignalRO, "ES2-IC12:HV-Ena", {"kind": "config", "doc": "Enable signal of HV"}), } - hv_en = Dcpt(hv_en_signals) \ No newline at end of file + hv_en = Dcpt(hv_en_signals) diff --git a/debye_bec/devices/ionization_chambers/ionization_chamber_enums.py b/debye_bec/devices/ionization_chambers/ionization_chamber_enums.py index 06f0da4..2cd7ecb 100644 --- a/debye_bec/devices/ionization_chambers/ionization_chamber_enums.py +++ b/debye_bec/devices/ionization_chambers/ionization_chamber_enums.py @@ -1,5 +1,6 @@ import enum + class AmplifierEnable(int, enum.Enum): """Enum class for the enable signal of the channel""" @@ -7,6 +8,7 @@ class AmplifierEnable(int, enum.Enum): STARTUP = 1 ON = 2 + class AmplifierGain(int, enum.Enum): """Enum class for the gain of the channel""" @@ -16,14 +18,15 @@ class AmplifierGain(int, enum.Enum): G1E8 = 3 G1E9 = 4 + class AmplifierFilter(int, enum.Enum): """Enum class for the filter of the channel""" - F1US = 0 - F3US = 1 - F10US = 2 - F30US = 3 + F1US = 0 + F3US = 1 + F10US = 2 + F30US = 3 F100US = 4 F300US = 5 - F1MS = 6 - F3MS = 7 \ No newline at end of file + F1MS = 6 + F3MS = 7 diff --git a/debye_bec/devices/mo1_bragg/mo1_bragg.py b/debye_bec/devices/mo1_bragg/mo1_bragg.py index 178a8ba..7c2f8e4 100644 --- a/debye_bec/devices/mo1_bragg/mo1_bragg.py +++ b/debye_bec/devices/mo1_bragg/mo1_bragg.py @@ -323,7 +323,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): while time.time() - start_time < timeout: if signal.get() == value: return None - if self.stopped is True: # Should this check be optional or configurable?! + if self.stopped is True: # Should this check be optional or configurable?! raise DeviceStopError(f"Device {self.name} was stopped while waiting for signal") time.sleep(0.1) # If we end up here, the status did not resolve @@ -445,7 +445,6 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): if hasattr(self.scan_parameter, key): setattr(self.scan_parameter, key, value) - def _check_scan_msg(self, target_state: ScanControlLoadMessage) -> None: """Check if the scan message is gettting available diff --git a/debye_bec/devices/mo1_bragg/mo1_bragg_utils.py b/debye_bec/devices/mo1_bragg/mo1_bragg_utils.py index 8ec5ae8..38d8409 100644 --- a/debye_bec/devices/mo1_bragg/mo1_bragg_utils.py +++ b/debye_bec/devices/mo1_bragg/mo1_bragg_utils.py @@ -1,31 +1,27 @@ -""" Module for additional utils of the Mo1 Bragg Positioner""" +"""Module for additional utils of the Mo1 Bragg Positioner""" import numpy as np from scipy.interpolate import BSpline ################ Define Constants ############ -SAFETY_FACTOR = 0.025 # safety factor to limit acceleration -> NEVER SET TO ZERO ! -N_SAMPLES = 41 # number of samples to generate -> Always choose uneven number, - # otherwise peak value will not be included -DEGREE_SPLINE = 3 # DEGREE_SPLINE of spline, 3 works good -TIME_COMPENSATE_SPLINE = 0.0062 # time to be compensated each spline in s -POSITION_COMPONSATION = 0.02 # angle to add at both limits, must be same values - # as used on ACS controller for simple scans +SAFETY_FACTOR = 0.025 # safety factor to limit acceleration -> NEVER SET TO ZERO ! +N_SAMPLES = 41 # number of samples to generate -> Always choose uneven number, +# otherwise peak value will not be included +DEGREE_SPLINE = 3 # DEGREE_SPLINE of spline, 3 works good +TIME_COMPENSATE_SPLINE = 0.0062 # time to be compensated each spline in s +POSITION_COMPONSATION = 0.02 # angle to add at both limits, must be same values +# as used on ACS controller for simple scans class Mo1UtilsSplineError(Exception): - """ Exception for spline computation""" + """Exception for spline computation""" def compute_spline( - low_deg:float, - high_deg:float, - p_kink:float, - e_kink_deg:float, - scan_time:float, + low_deg: float, high_deg: float, p_kink: float, e_kink_deg: float, scan_time: float ) -> tuple[float, float, float]: - """ Spline computation for the advanced scan mode - + """Spline computation for the advanced scan mode + Args: low_deg (float): Low angle value of the scan in deg high_deg (float): High angle value of the scan in deg @@ -42,53 +38,56 @@ def compute_spline( high_deg = high_deg + POSITION_COMPONSATION if p_kink < 0 or p_kink > 100: - raise Mo1UtilsSplineError("Kink position not within range of [0..100%]"+ - f"for p_kink: {p_kink}") + raise Mo1UtilsSplineError( + "Kink position not within range of [0..100%]" + f"for p_kink: {p_kink}" + ) if e_kink_deg < low_deg or e_kink_deg > high_deg: - raise Mo1UtilsSplineError("Kink energy not within selected energy range of scan,"+ - f"for e_kink_deg {e_kink_deg}, low_deg {low_deg} and"+ - f"high_deg {high_deg}.") + raise Mo1UtilsSplineError( + "Kink energy not within selected energy range of scan," + + f"for e_kink_deg {e_kink_deg}, low_deg {low_deg} and" + + f"high_deg {high_deg}." + ) tc1 = SAFETY_FACTOR / scan_time * TIME_COMPENSATE_SPLINE - t_kink = (scan_time - TIME_COMPENSATE_SPLINE - 2*(SAFETY_FACTOR - tc1)) * p_kink/100 + (SAFETY_FACTOR - tc1) + t_kink = (scan_time - TIME_COMPENSATE_SPLINE - 2 * (SAFETY_FACTOR - tc1)) * p_kink / 100 + ( + SAFETY_FACTOR - tc1 + ) - t_input = [0, - SAFETY_FACTOR - tc1, - t_kink, - scan_time - TIME_COMPENSATE_SPLINE - SAFETY_FACTOR + tc1, - scan_time - TIME_COMPENSATE_SPLINE] - p_input = [0, - 0, - e_kink_deg - low_deg, - high_deg - low_deg, - high_deg - low_deg] + t_input = [ + 0, + SAFETY_FACTOR - tc1, + t_kink, + scan_time - TIME_COMPENSATE_SPLINE - SAFETY_FACTOR + tc1, + scan_time - TIME_COMPENSATE_SPLINE, + ] + p_input = [0, 0, e_kink_deg - low_deg, high_deg - low_deg, high_deg - low_deg] - cv = np.stack((t_input, p_input)).T # spline coefficients + cv = np.stack((t_input, p_input)).T # spline coefficients max_param = len(cv) - DEGREE_SPLINE - kv = np.clip(np.arange(len(cv)+DEGREE_SPLINE+1)-DEGREE_SPLINE,0,max_param) # knots - spl = BSpline(kv, cv, DEGREE_SPLINE) # get spline function - p = spl(np.linspace(0,max_param,N_SAMPLES)) - v = spl(np.linspace(0,max_param,N_SAMPLES), 1) - a = spl(np.linspace(0,max_param,N_SAMPLES), 2) - j = spl(np.linspace(0,max_param,N_SAMPLES), 3) + kv = np.clip(np.arange(len(cv) + DEGREE_SPLINE + 1) - DEGREE_SPLINE, 0, max_param) # knots + spl = BSpline(kv, cv, DEGREE_SPLINE) # get spline function + p = spl(np.linspace(0, max_param, N_SAMPLES)) + v = spl(np.linspace(0, max_param, N_SAMPLES), 1) + a = spl(np.linspace(0, max_param, N_SAMPLES), 2) + j = spl(np.linspace(0, max_param, N_SAMPLES), 3) tim, pos = p.T pos = pos + low_deg - vel = v[:,1]/v[:,0] + vel = v[:, 1] / v[:, 0] acc = [] for item in a: - acc.append(0) if item[1] == 0 else acc.append(item[1]/item[0]) + acc.append(0) if item[1] == 0 else acc.append(item[1] / item[0]) jerk = [] for item in j: - jerk.append(0) if item[1] == 0 else jerk.append(item[1]/item[0]) + jerk.append(0) if item[1] == 0 else jerk.append(item[1] / item[0]) dt = np.zeros(len(tim)) for i in np.arange(len(tim)): if i == 0: dt[i] = 0 else: - dt[i] = 1000*(tim[i]-tim[i-1]) + dt[i] = 1000 * (tim[i] - tim[i - 1]) return pos, vel, dt diff --git a/debye_bec/devices/pilatus_curtain.py b/debye_bec/devices/pilatus_curtain.py index 35bcfc9..94c05b8 100644 --- a/debye_bec/devices/pilatus_curtain.py +++ b/debye_bec/devices/pilatus_curtain.py @@ -1,35 +1,27 @@ -""" ES2 Pilatus Curtain""" +"""ES2 Pilatus Curtain""" import time from ophyd import Component as Cpt -from ophyd import Device, Kind, EpicsSignal, EpicsSignalRO +from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind from ophyd_devices.utils import bec_utils + class GasMixSetup(Device): """Class for the ES2 Pilatus Curtain""" - USER_ACCESS = ['open', 'close'] + USER_ACCESS = ["open", "close"] - open_cover = Cpt( - EpicsSignal, suffix="OpenCover", kind="config", doc='Open Cover' - ) - close_cover = Cpt( - EpicsSignal, suffix="CloseCover", kind="config", doc='Close Cover' - ) + open_cover = Cpt(EpicsSignal, suffix="OpenCover", kind="config", doc="Open Cover") + close_cover = Cpt(EpicsSignal, suffix="CloseCover", kind="config", doc="Close Cover") cover_is_closed = Cpt( - EpicsSignalRO, suffix="CoverIsClosed", kind="config", doc='Cover is closed' - ) - cover_is_open = Cpt( - EpicsSignalRO, suffix="CoverIsOpen", kind="config", doc='Cover is open' + EpicsSignalRO, suffix="CoverIsClosed", kind="config", doc="Cover is closed" ) + cover_is_open = Cpt(EpicsSignalRO, suffix="CoverIsOpen", kind="config", doc="Cover is open") cover_is_moving = Cpt( - EpicsSignalRO, suffix="CoverIsMoving", kind="config", doc='Cover is moving' + EpicsSignalRO, suffix="CoverIsMoving", kind="config", doc="Cover is moving" ) - cover_error = Cpt( - EpicsSignalRO, suffix="CoverError", kind="config", doc='Cover error' - ) - + cover_error = Cpt(EpicsSignalRO, suffix="CoverError", kind="config", doc="Cover error") def __init__( self, prefix="", *, name: str, kind: Kind = None, device_manager=None, parent=None, **kwargs @@ -68,7 +60,7 @@ class GasMixSetup(Device): while not self.cover_is_open.get(): time.sleep(0.1) if self.cover_error.get(): - raise TimeoutError('Curtain did not open successfully and is now in an error state') + raise TimeoutError("Curtain did not open successfully and is now in an error state") def close(self) -> None: """Close the cover""" @@ -78,4 +70,6 @@ class GasMixSetup(Device): while not self.cover_is_closed.get(): time.sleep(0.1) if self.cover_error.get(): - raise TimeoutError('Curtain did not close successfully and is now in an error state') + raise TimeoutError( + "Curtain did not close successfully and is now in an error state" + ) diff --git a/debye_bec/scans/mono_bragg_scans.py b/debye_bec/scans/mono_bragg_scans.py index d6de55d..57ed477 100644 --- a/debye_bec/scans/mono_bragg_scans.py +++ b/debye_bec/scans/mono_bragg_scans.py @@ -2,6 +2,7 @@ import time from typing import Literal + import numpy as np from bec_lib.device import DeviceBase from bec_lib.logger import bec_logger @@ -59,7 +60,7 @@ class XASSimpleScan(AsyncFlyScanBase): def update_readout_priority(self): """Ensure that NIDAQ is not monitored for any quick EXAFS.""" super().update_readout_priority() - self.readout_priority['async'].append('nidaq') + self.readout_priority["async"].append("nidaq") def prepare_positions(self): """Prepare the positions for the scan.