Mono trigger update #49

Merged
appel_c merged 8 commits from mono-trigger-update into main 2025-03-11 10:17:43 +01:00
11 changed files with 1240 additions and 131 deletions

View File

@@ -360,15 +360,15 @@ fm_ztcp:
onFailure: retry
enabled: true
softwareTrigger: false
fm_xstripe:
readoutPriority: baseline
description: Focusing Morror X Stripe
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:XSTRIPE
onFailure: retry
enabled: true
softwareTrigger: false
# fm_xstripe:
# readoutPriority: baseline
# description: Focusing Morror X Stripe
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X01DA-OP-FM:XSTRIPE
# onFailure: retry
# enabled: true
# softwareTrigger: false
## Optics Slits 1 -- Physical positioners
@@ -594,6 +594,20 @@ ot_pitch:
enabled: true
softwareTrigger: false
#########################################
## Exit Window -- Physical Positioners ##
#########################################
es0wi_try:
readoutPriority: baseline
description: End Station 0 Exit Window Y-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES0-WI:TRY
onFailure: retry
enabled: true
softwareTrigger: false
###############################################
## End Station Slits -- Physical Positioners ##
###############################################
@@ -676,17 +690,186 @@ es0sl_gapy:
enabled: true
softwareTrigger: false
#########################################################
## Pinhole and alignment laser -- Physical Positioners ##
#########################################################
#########################################
## Exit Window -- Physical Positioners ##
#########################################
es0wi_try:
es1pin_try:
readoutPriority: baseline
description: End Station 0 Exit Window Y-translation
description: End Station pinhole and alignment laser Y-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES0-WI:TRY
prefix: X01DA-ES1-PIN1:TRY
onFailure: retry
enabled: true
softwareTrigger: false
softwareTrigger: false
es1pin_trx:
readoutPriority: baseline
description: End Station pinhole and alignment laser X-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES1-PIN1:TRX
onFailure: retry
enabled: true
softwareTrigger: false
es1pin_rotx:
readoutPriority: baseline
description: End Station pinhole and alignment laser X-rotation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES1-PIN1:ROTX
onFailure: retry
enabled: true
softwareTrigger: false
es1pin_roty:
readoutPriority: baseline
description: End Station pinhole and alignment laser Y-rotation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES1-PIN1:ROTY
onFailure: retry
enabled: true
softwareTrigger: false
################################################
## Sample Manipulator -- Physical Positioners ##
################################################
es1man_trx:
readoutPriority: baseline
description: End Station sample manipulator X-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES1-MAN1:TRX
onFailure: retry
enabled: true
softwareTrigger: false
es1man_try:
readoutPriority: baseline
description: End Station sample manipulator Y-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES1-MAN1:TRY
onFailure: retry
enabled: true
softwareTrigger: false
es1man_trz:
readoutPriority: baseline
description: End Station sample manipulator Z-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES1-MAN1:TRZ
onFailure: retry
enabled: true
softwareTrigger: false
es1man_roty:
readoutPriority: baseline
description: End Station sample manipulator Y-rotation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES1-MAN1:ROTY
onFailure: retry
enabled: true
softwareTrigger: false
############################################
## Segemented Arc -- Physical Positioners ##
############################################
es1arc_roty:
readoutPriority: baseline
description: End Station segmented arc Y-rotation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES1-ARC:ROTY
onFailure: retry
enabled: true
softwareTrigger: false
es1det1_trx:
readoutPriority: baseline
description: End Station SDD 1 X-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES1-DET1:TRX
onFailure: retry
enabled: true
softwareTrigger: false
es1bm1_trx:
readoutPriority: baseline
description: End Station X-ray Eye X-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES1-BM1:TRX
onFailure: retry
enabled: true
softwareTrigger: false
es1det2_trx:
readoutPriority: baseline
description: End Station SDD 2 X-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES1-DET2:TRX
onFailure: retry
enabled: true
softwareTrigger: false
#######################################
## Beam Stop -- Physical Positioners ##
#######################################
es2bs_trx:
readoutPriority: baseline
description: End Station beamstop X-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES2-BS:TRX
onFailure: retry
enabled: true
softwareTrigger: false
es2bs_try:
readoutPriority: baseline
description: End Station beamstop Y-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES2-BS:TRY
onFailure: retry
enabled: true
softwareTrigger: false
##############################################
## IC12 Manipulator -- Physical Positioners ##
##############################################
es2ma2_try:
readoutPriority: baseline
description: End Station ionization chamber 1+2 Y-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES2-MA2:TRY
onFailure: retry
enabled: true
softwareTrigger: false
es2ma2_trz:
readoutPriority: baseline
description: End Station ionization chamber 1+2 Z-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES2-MA2:TRZ
onFailure: retry
enabled: true
softwareTrigger: false
#######################################################
## XRD Detector Manipulator -- Physical Positioners ##
#######################################################
es2ma3_try:
readoutPriority: baseline
description: End Station XRD detector Y-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES2-MA3:TRY
onFailure: retry
enabled: true
softwareTrigger: false

View File

@@ -17,6 +17,8 @@ dummy_pv:
onFailure: retry
enabled: true
softwareTrigger: false
## NIDAQ
nidaq:
readoutPriority: async
description: NIDAQ backend for data reading for debye scans
@@ -27,3 +29,171 @@ nidaq:
enabled: true
softwareTrigger: false
# ES0 Filter
# es0filter:
# readoutPriority: async
# description: ES0 filter station
# deviceClass: debye_bec.devices.es0filter.ES0Filter
# deviceConfig:
# prefix: "X01DA-ES0-FI:"
# onFailure: retry
# enabled: true
# softwareTrigger: false
# Current amplifiers
# amplifiers:
# readoutPriority: async
# description: ES current amplifiers
# deviceClass: debye_bec.devices.amplifiers.Amplifiers
# deviceConfig:
# prefix: "X01DA-ES:AMP5004"
# onFailure: retry
# enabled: true
# softwareTrigger: false
# HV power supplies
# hv_supplies:
# readoutPriority: async
# description: HV power supplies
# deviceClass: debye_bec.devices.hv_supplies.HVSupplies
# deviceConfig:
# prefix: "X01DA-"
# onFailure: retry
# enabled: true
# softwareTrigger: false
# Gas Mix Setup
# gas_mix_setup:
# readoutPriority: async
# description: Gas Mix Setup for Ionization Chambers
# deviceClass: debye_bec.devices.gas_mix_setup.GasMixSetup
# deviceConfig:
# prefix: "X01DA-ES-GMES:"
# onFailure: retry
# enabled: true
# softwareTrigger: false
# Pilatus Curtain
# pilatus_curtain:
# readoutPriority: async
# description: Pilatus Curtain
# deviceClass: debye_bec.devices.pilatus_curtain.PilatusCurtain
# deviceConfig:
# prefix: "X01DA-ES2-DET3:TRY-"
# onFailure: retry
# enabled: true
# softwareTrigger: false
################################
## ES Hutch Sensors and Light ##
################################
es_temperature1:
readoutPriority: monitored
description: ES temperature sensor 1
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
read_pv: "X01DA-PC-I2C:_CH1:TEMP"
onFailure: retry
enabled: true
softwareTrigger: false
es_humidity1:
readoutPriority: monitored
description: ES humidity sensor 1
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
read_pv: "X01DA-PC-I2C:_CH1:HUMIREL"
onFailure: retry
enabled: true
softwareTrigger: false
es_pressure1:
readoutPriority: monitored
description: ES ambient pressure sensor 1
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
read_pv: "X01DA-PC-I2C:_CH1:PRES"
onFailure: retry
enabled: true
softwareTrigger: false
es_temperature2:
readoutPriority: monitored
description: ES temperature sensor 2
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
read_pv: "X01DA-PC-I2C:_CH2:TEMP"
onFailure: retry
enabled: true
softwareTrigger: false
es_humidity2:
readoutPriority: monitored
description: ES humidity sensor 2
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
read_pv: "X01DA-PC-I2C:_CH2:HUMIREL"
onFailure: retry
enabled: true
softwareTrigger: false
es_pressure2:
readoutPriority: monitored
description: ES ambient pressure sensor 2
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
read_pv: "X01DA-PC-I2C:_CH2:PRES"
onFailure: retry
enabled: true
softwareTrigger: false
es_light_toggle:
readoutPriority: monitored
description: ES light toggle
deviceClass: ophyd.EpicsSignal
deviceConfig:
read_pv: "X01DA-EH-LIGHT:TOGGLE"
onFailure: retry
enabled: true
softwareTrigger: false
#################
## SDD sensors ##
#################
sdd1_temperature:
readoutPriority: monitored
description: SDD1 temperature sensor
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
read_pv: "X01DA-ES1-DET1:Temperature"
onFailure: retry
enabled: true
softwareTrigger: false
sdd1_humidity:
readoutPriority: monitored
description: SDD1 humidity sensor
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
read_pv: "X01DA-ES1-DET1:Humidity"
onFailure: retry
enabled: true
softwareTrigger: false
#####################
## Alignment Laser ##
#####################
es1_alignment_laser:
readoutPriority: monitored
description: ES1 alignment laser
deviceClass: ophyd.EpicsSignal
deviceConfig:
read_pv: "X01DA-ES1-LAS:Relay"
onFailure: retry
enabled: true
softwareTrigger: false

View File

@@ -0,0 +1,186 @@
""" ES Current amplifiers"""
import enum
from typing import Literal
from ophyd import Component as Cpt
from ophyd import Device, Kind, EpicsSignalWithRBV
from ophyd_devices.utils import bec_utils
class Enable(int, enum.Enum):
"""Enum class for the enable signal of the channel"""
OFF = 0
STARTUP = 1
ON = 2
class Gain(int, enum.Enum):
"""Enum class for the gain of the channel"""
G1E6 = 0
G1E7 = 1
G5E7 = 2
G1E8 = 3
G1E9 = 4
class Filter(int, enum.Enum):
"""Enum class for the filter of the channel"""
F1US = 0
F3US = 1
F10US = 2
F30US = 3
F100US = 4
F300US = 5
F1MS = 6
F3MS = 7
class Amplifiers(Device):
"""Class for the ES current amplifiers"""
USER_ACCESS = ['set_channel']
ic0_enable = Cpt(
EpicsSignalWithRBV, suffix=".cOnOff1", kind="config", doc='Enable ch1 -> IC0'
)
ic0_gain = Cpt(
EpicsSignalWithRBV, suffix=":cGain1_ENUM", kind="config", doc='Gain of ch1 -> IC0'
)
ic0_filter = Cpt(
EpicsSignalWithRBV, suffix=":cFilter1_ENUM", kind="config", doc='Filter of ch1 -> IC0'
)
ic1_enable = Cpt(
EpicsSignalWithRBV, suffix=".cOnOff2", kind="config", doc='Enable ch2 -> IC1'
)
ic1_gain = Cpt(
EpicsSignalWithRBV, suffix=":cGain2_ENUM", kind="config", doc='Gain of ch2 -> IC1'
)
ic1_filter = Cpt(
EpicsSignalWithRBV, suffix=":cFilter2_ENUM", kind="config", doc='Filter of ch2 -> IC1'
)
ic2_enable = Cpt(
EpicsSignalWithRBV, suffix=".cOnOff3", kind="config", doc='Enable ch3 -> IC2'
)
ic2_gain = Cpt(
EpicsSignalWithRBV, suffix=":cGain3_ENUM", kind="config", doc='Gain of ch3 -> IC2'
)
ic2_filter = Cpt(
EpicsSignalWithRBV, suffix=":cFilter3_ENUM", kind="config", doc='Filter of ch3 -> IC2'
)
pips_enable = Cpt(
EpicsSignalWithRBV, suffix=".cOnOff4", kind="config", doc='Enable ch4 -> PIPS'
)
pips_gain = Cpt(
EpicsSignalWithRBV, suffix=":cGain4_ENUM", kind="config", doc='Gain of ch4 -> PIPS'
)
pips_filter = Cpt(
EpicsSignalWithRBV, suffix=":cFilter4_ENUM", kind="config", doc='Filter of ch4 -> PIPS'
)
def __init__(
self, prefix="", *, name: str, kind: Kind = None, device_manager=None, parent=None, **kwargs
):
"""Initialize the Current Amplifiers.
Args:
prefix (str): EPICS prefix for the device
name (str): Name of the device
kind (Kind): Kind of the device
device_manager (DeviceManager): Device manager instance
parent (Device): Parent device
kwargs: Additional keyword arguments
"""
super().__init__(prefix, name=name, kind=kind, parent=parent, **kwargs)
self.device_manager = device_manager
self.service_cfg = None
self.timeout_for_pvwait = 2.5
self.readback.name = self.name
# Wait for connection on all components, ensure IOC is connected
self.wait_for_connection(all_signals=True, timeout=5)
if device_manager:
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
self.connector = self.device_manager.connector
def set_channel(
self,
detector: Literal['ic0', 'ic1', 'ic2', 'pips'],
gain: Literal['1e6', '1e7', '5e7', '1e8', '1e9'],
filter: Literal['1us', '3us', '10us', '30us', '100us', '300us', '1ms', '3ms']
) -> None:
"""Configure the gain setting of the specified channel
Args:
detector (Literal['ic0', 'ic1', 'ic2', 'pips']) : Detector
gain (Literal['1e6', '1e7', '5e7', '1e8', '1e9']) : Desired gain
filter (Literal['1us', '3us', '10us', '30us', '100us', '300us', '1ms', '3ms']) : Desired filter
"""
ch_enable = None
ch_gain = None
ch_filter = None
match detector:
case 'ic0':
ch_enable = self.ic0_enable
ch_gain = self.ic0_gain
ch_filter = self.ic0_filter
case 'ic1':
ch_enable = self.ic1_enable
ch_gain = self.ic1_gain
ch_filter = self.ic1_filter
case 'ic2':
ch_enable = self.ic2_enable
ch_gain = self.ic2_gain
ch_filter = self.ic2_filter
case 'pips':
ch_enable = self.pips_enable
ch_gain = self.pips_gain
ch_filter = self.pips_filter
ch_enable.put(Enable.ON)
# Wait until channel is switched on
if not self.wait_for_signals(
signal_conditions=[(ch_enable.get, Enable.ON)],
timeout=self.timeout_for_pvwait,
check_stopped=True,
):
raise TimeoutError(
f"Enabling channel run into timeout after {self.timeout_for_pvwait} seconds"
)
match gain:
case '1e6':
ch_gain.put(Gain.G1E6)
case '1e7':
ch_gain.put(Gain.G1E7)
case '5e7':
ch_gain.put(Gain.G5E7)
case '1e8':
ch_gain.put(Gain.G1E8)
case '1e9':
ch_gain.put(Gain.G1E9)
match filter:
case '1us':
ch_filter.put(Filter.F1US)
case '3us':
ch_filter.put(Filter.F3US)
case '10us':
ch_filter.put(Filter.F10US)
case '30us':
ch_filter.put(Filter.F30US)
case '100us':
ch_filter.put(Filter.F100US)
case '300us':
ch_filter.put(Filter.F300US)
case '1ms':
ch_filter.put(Filter.F1MS)
case '3ms':
ch_filter.put(Filter.F3MS)

View File

@@ -0,0 +1,89 @@
""" ES0 Filter Station"""
from ophyd import Component as Cpt
from ophyd import Device, Kind, EpicsSignalWithRBV
from ophyd_devices.utils import bec_utils
class ES0Filter(Device):
"""Class for the ES0 filter station"""
USER_ACCESS = ['set_filters']
filter_output = Cpt(
EpicsSignalWithRBV,
suffix="BIO",
kind="config",
doc='Packed value of filter positions'
)
def __init__(
self, prefix="", *, name: str, kind: Kind = None, device_manager=None, parent=None, **kwargs
):
"""Initialize the ES0 Filter Station.
Args:
prefix (str): EPICS prefix for the device
name (str): Name of the device
kind (Kind): Kind of the device
device_manager (DeviceManager): Device manager instance
parent (Device): Parent device
kwargs: Additional keyword arguments
"""
super().__init__(prefix, name=name, kind=kind, parent=parent, **kwargs)
self.device_manager = device_manager
self.service_cfg = None
self.timeout_for_pvwait = 2.5
self.readback.name = self.name
# Wait for connection on all components, ensure IOC is connected
self.wait_for_connection(all_signals=True, timeout=5)
if device_manager:
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
self.connector = self.device_manager.connector
def set_filters(self, filters: list) -> None:
"""Configure the filters according to the list
Args:
filters (list) : List of strings representing the filters, e.g. ['Mo400', 'Al20']
"""
output = 0
for filter in filters:
match filter:
case 'Mo400':
output = output & (1 << 1)
case 'Mo300':
output = output & (1 << 2)
case 'Mo200':
output = output & (1 << 3)
case 'Zn500':
output = output & (1 << 4)
case 'Zn250':
output = output & (1 << 5)
case 'Zn125':
output = output & (1 << 6)
case 'Zn50':
output = output & (1 << 7)
case 'Zn25':
output = output & (1 << 8)
case 'Al500':
output = output & (1 << 9)
case 'Al320':
output = output & (1 << 10)
case 'Al200':
output = output & (1 << 11)
case 'Al100':
output = output & (1 << 12)
case 'Al50':
output = output & (1 << 13)
case 'Al20':
output = output & (1 << 14)
case 'Al10':
output = output & (1 << 15)
self.filter_output.put(output)

View File

@@ -0,0 +1,236 @@
""" ES Gas Mix Setup"""
import time
from typing import Literal
from ophyd import Component as Cpt
from ophyd import Device, Kind, EpicsSignalWithRBV, EpicsSignal, EpicsSignalRO
from ophyd_devices.utils import bec_utils
class GasMixSetup(Device):
"""Class for the ES HGas Mix Setup"""
USER_ACCESS = ['fill_ic']
# IC0
ic0_gas1_req = Cpt(
EpicsSignalWithRBV, suffix="IC0Gas1Req", kind="config", doc='IC0 Gas 1 requirement'
)
ic0_conc1_req = Cpt(
EpicsSignalWithRBV, suffix="IC0Conc1Req", kind="config", doc='IC0 Concentration 1 requirement'
)
ic0_gas2_req = Cpt(
EpicsSignalWithRBV, suffix="IC0Gas2Req", kind="config", doc='IC0 Gas 2 requirement'
)
ic0_conc2_req = Cpt(
EpicsSignalWithRBV, suffix="IC0Conc2Req", kind="config", doc='IC0 Concentration 2 requirement'
)
ic0_press_req = Cpt(
EpicsSignalWithRBV, suffix="IC0PressReq", kind="config", doc='IC0 Pressure requirement'
)
ic0_fill = Cpt(
EpicsSignal, suffix="IC0Fill", kind="config", doc='Fill IC0'
)
ic0_status = Cpt(
EpicsSignalRO, suffix="IC0Status", kind="config", doc='Status of IC0'
)
ic0_gas1 = Cpt(
EpicsSignalRO, suffix="IC0Gas1", kind="config", doc='IC0 Gas 1'
)
ic0_conc1 = Cpt(
EpicsSignalRO, suffix="IC0Conc1", kind="config", doc='IC0 Concentration 1'
)
ic0_gas2 = Cpt(
EpicsSignalRO, suffix="IC0Gas2", kind="config", doc='IC0 Gas 2'
)
ic0_conc2 = Cpt(
EpicsSignalRO, suffix="IC0Conc2", kind="config", doc='IC0 Concentration 2'
)
ic0_press = Cpt(
EpicsSignalRO, suffix="IC0PressTransm", kind="config", doc='Current IC0 Pressure'
)
# IC1
ic1_gas1_req = Cpt(
EpicsSignalWithRBV, suffix="IC1Gas1Req", kind="config", doc='IC1 Gas 1 requirement'
)
ic1_conc1_req = Cpt(
EpicsSignalWithRBV, suffix="IC1Conc1Req", kind="config", doc='IC1 Concentration 1 requirement'
)
ic1_gas2_req = Cpt(
EpicsSignalWithRBV, suffix="IC1Gas2Req", kind="config", doc='IC1 Gas 2 requirement'
)
ic1_conc2_req = Cpt(
EpicsSignalWithRBV, suffix="IC1Conc2Req", kind="config", doc='IC1 Concentration 2 requirement'
)
ic1_press_req = Cpt(
EpicsSignalWithRBV, suffix="IC1PressReq", kind="config", doc='IC1 Pressure requirement'
)
ic1_fill = Cpt(
EpicsSignal, suffix="IC1Fill", kind="config", doc='Fill IC1'
)
ic1_status = Cpt(
EpicsSignalRO, suffix="IC1Status", kind="config", doc='Status of IC1'
)
ic1_gas1 = Cpt(
EpicsSignalRO, suffix="IC1Gas1", kind="config", doc='IC1 Gas 1'
)
ic1_conc1 = Cpt(
EpicsSignalRO, suffix="IC1Conc1", kind="config", doc='IC1 Concentration 1'
)
ic1_gas2 = Cpt(
EpicsSignalRO, suffix="IC1Gas2", kind="config", doc='IC1 Gas 2'
)
ic1_conc2 = Cpt(
EpicsSignalRO, suffix="IC1Conc2", kind="config", doc='IC1 Concentration 2'
)
ic1_press = Cpt(
EpicsSignalRO, suffix="IC1PressTransm", kind="config", doc='Current IC1 Pressure'
)
# IC2
ic2_gas1_req = Cpt(
EpicsSignalWithRBV, suffix="IC2Gas1Req", kind="config", doc='IC2 Gas 1 requirement'
)
ic2_conc1_req = Cpt(
EpicsSignalWithRBV, suffix="IC2Conc1Req", kind="config", doc='IC2 Concentration 1 requirement'
)
ic2_gas2_req = Cpt(
EpicsSignalWithRBV, suffix="IC2Gas2Req", kind="config", doc='IC2 Gas 2 requirement'
)
ic2_conc2_req = Cpt(
EpicsSignalWithRBV, suffix="IC2Conc2Req", kind="config", doc='IC2 Concentration 2 requirement'
)
ic2_press_req = Cpt(
EpicsSignalWithRBV, suffix="IC2PressReq", kind="config", doc='IC2 Pressure requirement'
)
ic2_fill = Cpt(
EpicsSignal, suffix="IC2Fill", kind="config", doc='Fill IC2'
)
ic2_status = Cpt(
EpicsSignalRO, suffix="IC2Status", kind="config", doc='Status of IC2'
)
ic2_gas1 = Cpt(
EpicsSignalRO, suffix="IC2Gas1", kind="config", doc='IC2 Gas 1'
)
ic2_conc1 = Cpt(
EpicsSignalRO, suffix="IC2Conc1", kind="config", doc='IC2 Concentration 1'
)
ic2_gas2 = Cpt(
EpicsSignalRO, suffix="IC2Gas2", kind="config", doc='IC2 Gas 2'
)
ic2_conc2 = Cpt(
EpicsSignalRO, suffix="IC2Conc2", kind="config", doc='IC2 Concentration 2'
)
ic2_press = Cpt(
EpicsSignalRO, suffix="IC2PressTransm", kind="config", doc='Current IC2 Pressure'
)
def __init__(
self, prefix="", *, name: str, kind: Kind = None, device_manager=None, parent=None, **kwargs
):
"""Initialize the Gas Mix Setup.
Args:
prefix (str): EPICS prefix for the device
name (str): Name of the device
kind (Kind): Kind of the device
device_manager (DeviceManager): Device manager instance
parent (Device): Parent device
kwargs: Additional keyword arguments
"""
super().__init__(prefix, name=name, kind=kind, parent=parent, **kwargs)
self.device_manager = device_manager
self.service_cfg = None
self.timeout_for_pvwait = 360
self.readback.name = self.name
# Wait for connection on all components, ensure IOC is connected
self.wait_for_connection(all_signals=True, timeout=5)
if device_manager:
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
self.connector = self.device_manager.connector
def fill_ic(
self,
detector: Literal['ic0', 'ic1', 'ic2'],
gas1: Literal['He', 'LN2', 'Ar', 'Kr'],
conc1: float,
gas2: Literal['He', 'LN2', 'Ar', 'Kr'],
conc2: float,
pressure: float,
) -> None:
"""Fill an ionization chamber with the specified gas mixture.
Args:
detector (Literal['ic0', 'ic1', 'ic2']) : Detector
gas1 (Literal['He', 'LN2', 'Ar', 'Kr']) : Gas 1 requirement,
conc1 (float) : Concentration 1 requirement in %,
gas2 (Literal['He', 'LN2', 'Ar', 'Kr']) : Gas 2 requirement,
conc2 (float) : Concentration 2 requirement in %,
pressure (float) : Required pressure in bar abs,
"""
if 100 < conc1 < 0:
raise ValueError('Concentration 1 out of range [0 .. 100 %]')
if 100 < conc2 < 0:
raise ValueError('Concentration 2 out of range [0 .. 100 %]')
if 3 < pressure < 0:
raise ValueError('Pressure out of range [0 .. 3 bar abs]')
ch_gas1_req = None
ch_conc1_req = None
ch_gas2_req = None
ch_conc2_req = None
ch_press_req = None
ch_fill = None
ch_status = None
match detector:
case 'ic0':
ch_gas1_req = self.ic0_gas1_req
ch_conc1_req = self.ic0_conc1_req
ch_gas2_req = self.ic0_gas2_req
ch_conc2_req = self.ic0_conc2_req
ch_press_req = self.ic0_press_req
ch_fill = self.ic0_fill
ch_status = self.ic0_status
case 'ic1':
ch_gas1_req = self.ic1_gas1_req
ch_conc1_req = self.ic1_conc1_req
ch_gas2_req = self.ic1_gas2_req
ch_conc2_req = self.ic1_conc2_req
ch_press_req = self.ic1_press_req
ch_fill = self.ic1_fill
ch_status = self.ic1_status
case 'ic2':
ch_gas1_req = self.ic2_gas1_req
ch_conc1_req = self.ic2_conc1_req
ch_gas2_req = self.ic2_gas2_req
ch_conc2_req = self.ic2_conc2_req
ch_press_req = self.ic2_press_req
ch_fill = self.ic2_fill
ch_status = self.ic2_status
ch_gas1_req.put(gas1)
ch_conc1_req.put(conc1)
ch_gas2_req.put(gas2)
ch_conc2_req.put(conc2)
ch_press_req.put(pressure)
time.sleep(0.5)
ch_fill.put(1)
time.sleep(1)
# Wait until ionization chamber is filled successfully
if not self.wait_for_signals(
signal_conditions=[(ch_status.get, 1)],
timeout=self.timeout_for_pvwait,
check_stopped=True,
):
raise TimeoutError(
f"Ionization chamber still not filled after {self.timeout_for_pvwait} seconds, check caqtdm panel"
)

View File

@@ -0,0 +1,165 @@
""" ES HV power supplies"""
from typing import Literal
from ophyd import Component as Cpt
from ophyd import Device, Kind, EpicsSignalWithRBV, EpicsSignal, EpicsSignalRO
from ophyd_devices.utils import bec_utils
class Amplifiers(Device):
"""Class for the ES HV power supplies"""
USER_ACCESS = ['set_ic']
# IC0
ic0_ext_ena = Cpt(
EpicsSignalRO, suffix="ES1-IC0:HV-Ext-Ena", kind="config", doc='External enable signal of HV IC0'
)
ic0_ena = Cpt(
EpicsSignalWithRBV, suffix="ES1-IC0:HV-Ena", kind="config", doc='Enable signal of HV IC0'
)
ic0_hv_v = Cpt(
EpicsSignal, suffix="ES1-IC0:HV1-VSet", kind="config", doc='HV voltage of IC0'
)
ic0_hv_i = Cpt(
EpicsSignal, suffix="ES1-IC0:HV1-V-RB", kind="config", doc='HV current of IC0'
)
ic0_grid_v = Cpt(
EpicsSignal, suffix="ES1-IC0:HV2-VSet", kind="config", doc='Grid voltage of IC0'
)
ic0_grid_i = Cpt(
EpicsSignal, suffix="ES1-IC0:HV2-V-RB", kind="config", doc='Grid current of IC0'
)
# IC1
ic1_ext_ena = Cpt(
EpicsSignalRO, suffix="ES2-IC12:HV-Ext-Ena", kind="config", doc='External enable signal of HV IC1/IC2'
)
ic1_ena = Cpt(
EpicsSignalWithRBV, suffix="ES2-IC12:HV-Ena", kind="config", doc='Enable signal of HV IC1/IC2'
)
ic1_hv_v = Cpt(
EpicsSignal, suffix="ES2-IC1:HV1-VSet", kind="config", doc='HV voltage of IC1'
)
ic1_hv_i = Cpt(
EpicsSignal, suffix="ES2-IC1:HV1-V-RB", kind="config", doc='HV current of IC1'
)
ic1_grid_v = Cpt(
EpicsSignal, suffix="ES2-IC1:HV2-VSet", kind="config", doc='Grid voltage of IC1'
)
ic1_grid_i = Cpt(
EpicsSignal, suffix="ES2-IC1:HV2-V-RB", kind="config", doc='Grid current of IC1'
)
# IC2
ic2_ext_ena = Cpt(
EpicsSignalRO, suffix="ES2-IC12:HV-Ext-Ena", kind="config", doc='External enable signal of HV IC1/IC2'
)
ic2_ena = Cpt(
EpicsSignalWithRBV, suffix="ES2-IC12:HV-Ena", kind="config", doc='Enable signal of HV IC1/IC2'
)
ic2_hv_v = Cpt(
EpicsSignal, suffix="ES2-IC2:HV1-VSet", kind="config", doc='HV voltage of IC2'
)
ic2_hv_i = Cpt(
EpicsSignal, suffix="ES2-IC2:HV1-V-RB", kind="config", doc='HV current of IC2'
)
ic2_grid_v = Cpt(
EpicsSignal, suffix="ES2-IC2:HV2-VSet", kind="config", doc='Grid voltage of IC2'
)
ic2_grid_i = Cpt(
EpicsSignal, suffix="ES2-IC2:HV2-V-RB", kind="config", doc='Grid current of IC2'
)
def __init__(
self, prefix="", *, name: str, kind: Kind = None, device_manager=None, parent=None, **kwargs
):
"""Initialize the Current Amplifiers.
Args:
prefix (str): EPICS prefix for the device
name (str): Name of the device
kind (Kind): Kind of the device
device_manager (DeviceManager): Device manager instance
parent (Device): Parent device
kwargs: Additional keyword arguments
"""
super().__init__(prefix, name=name, kind=kind, parent=parent, **kwargs)
self.device_manager = device_manager
self.service_cfg = None
self.timeout_for_pvwait = 2.5
self.readback.name = self.name
# Wait for connection on all components, ensure IOC is connected
self.wait_for_connection(all_signals=True, timeout=5)
if device_manager:
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
self.connector = self.device_manager.connector
def set_voltage(
self,
detector: Literal['ic0', 'ic1', 'ic2'],
hv: float,
grid: float
) -> None:
"""Configure the voltage settings of the specified detector, this will
enable the high voltage (if external enable is active)!
Args:
detector (Literal['ic0', 'ic1', 'ic2']) : Detector
hv (float) : Desired voltage for the 'HV' terminal
grid (float) : Desired voltage for the 'Grid' terminal
"""
if 3000 < hv < 0:
raise ValueError('specified HV not within range [0 .. 3000]')
if 3000 < grid < 0:
raise ValueError('specified Grid not within range [0 .. 3000]')
if grid > hv:
raise ValueError('Grid must not be higher than HV!')
ch_ena = None
ch_hv_v = None
ch_hv_i = None
ch_grid_v = None
ch_grid_i = None
match detector:
case 'ic0':
ch_ena = self.ic0_ena
ch_hv_v = self.ic0_hv_v
ch_hv_i = self.ic0_hv_i
ch_grid_v = self.ic0_grid_v
ch_grid_i = self.ic0_grid_i
case 'ic1':
ch_ena = self.ic1_ena
ch_hv_v = self.ic1_hv_v
ch_hv_i = self.ic1_hv_i
ch_grid_v = self.ic1_grid_v
ch_grid_i = self.ic1_grid_i
case 'ic2':
ch_ena = self.ic2_ena
ch_hv_v = self.ic2_hv_v
ch_hv_i = self.ic2_hv_i
ch_grid_v = self.ic2_grid_v
ch_grid_i = self.ic2_grid_i
ch_ena.put(1)
# Wait until channel is switched on
if not self.wait_for_signals(
signal_conditions=[(ch_ena.get, 1)],
timeout=self.timeout_for_pvwait,
check_stopped=True,
):
raise TimeoutError(
f"Enabling channel run into timeout after {self.timeout_for_pvwait} seconds"
)
# Set current fixed to 3 mA (max)
ch_hv_i.put(3)
ch_hv_v.put(hv)
ch_grid_i.put(3)
ch_grid_v.put(grid)

View File

@@ -1,6 +1,6 @@
""" Module for the Mo1 Bragg positioner of the Debye beamline.
"""Module for the Mo1 Bragg positioner of the Debye beamline.
The softIOC is reachable via the EPICS prefix X01DA-OP-MO1:BRAGG: and connected
to a motor controller via web sockets. The Mo1 Bragg positioner is not only a
to a motor controller via web sockets. The Mo1 Bragg positioner is not only a
positioner, but also a scan controller to setup XAS and XRD scans. A few scan modes
are programmed in the controller, e.g. simple and advanced XAS scans + XRD triggering mode.
@@ -28,14 +28,30 @@ from ophyd import (
Signal,
Staged,
)
from typeguard import typechecked
from ophyd.utils import LimitError
from ophyd_devices.utils import bec_scaninfo_mixin, bec_utils
from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError
from typeguard import typechecked
from debye_bec.devices.utils.mo1_bragg_utils import compute_spline
logger = bec_logger.logger
class TriggerControlSource(int, enum.Enum):
"""Enum class for the trigger control source of the trigger generator"""
EPICS = 0
INPOS = 1
class TriggerControlMode(int, enum.Enum):
"""Enum class for the trigger control mode of the trigger generator"""
PULSE = 0
CONDITION = 1
class ScanControlScanStatus(int, enum.Enum):
"""Enum class for the scan status of the Bragg positioner"""
@@ -51,12 +67,12 @@ class ScanControlLoadMessage(int, enum.Enum):
PENDING = 0
STARTED = 1
SUCCESS = 2
ERR_XRD_MEAS_LEN_LOW = 3
ERR_XRD_N_TRIGGERS_LOW = 4
ERR_XRD_TRIGS_EVERY_N_LOW = 5
ERR_XRD_MEAS_LEN_HI = 6
ERR_XRD_N_TRIGGERS_HI = 7
ERR_XRD_TRIGS_EVERY_N_HI = 8
ERR_TRIG_MEAS_LEN_LOW = 3
ERR_TRIG_N_TRIGGERS_LOW = 4
ERR_TRIG_TRIGS_EVERY_N_LOW = 5
ERR_TRIG_MEAS_LEN_HI = 6
ERR_TRIG_N_TRIGGERS_HI = 7
ERR_TRIG_TRIGS_EVERY_N_HI = 8
ERR_SCAN_HI_ANGLE_LIMIT = 9
ERR_SCAN_LOW_ANGLE_LIMITS = 10
ERR_SCAN_TIME = 11
@@ -138,18 +154,16 @@ class Mo1BraggCrystal(Device):
class Mo1BraggScanSettings(Device):
"""Mo1 Bragg PVs to set the scan setttings"""
# XRD settings
xrd_select_ref_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_select_ref_ENUM", kind="config")
xrd_enable_hi_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_enable_hi_ENUM", kind="config")
# TRIG settings
trig_select_ref_enum = Cpt(EpicsSignalWithRBV, suffix="trig_select_ref_ENUM", kind="config")
xrd_time_hi = Cpt(EpicsSignalWithRBV, suffix="xrd_time_hi", kind="config")
xrd_n_trigger_hi = Cpt(EpicsSignalWithRBV, suffix="xrd_n_trigger_hi", kind="config")
xrd_every_n_hi = Cpt(EpicsSignalWithRBV, suffix="xrd_every_n_hi", kind="config")
trig_ena_hi_enum = Cpt(EpicsSignalWithRBV, suffix="trig_ena_hi_ENUM", kind="config")
trig_time_hi = Cpt(EpicsSignalWithRBV, suffix="trig_time_hi", kind="config")
trig_every_n_hi = Cpt(EpicsSignalWithRBV, suffix="trig_every_n_hi", kind="config")
xrd_enable_lo_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_enable_lo_ENUM", kind="config")
xrd_time_lo = Cpt(EpicsSignalWithRBV, suffix="xrd_time_lo", kind="config")
xrd_n_trigger_lo = Cpt(EpicsSignalWithRBV, suffix="xrd_n_trigger_lo", kind="config")
xrd_every_n_lo = Cpt(EpicsSignalWithRBV, suffix="xrd_every_n_lo", kind="config")
trig_ena_lo_enum = Cpt(EpicsSignalWithRBV, suffix="trig_ena_lo_ENUM", kind="config")
trig_time_lo = Cpt(EpicsSignalWithRBV, suffix="trig_time_lo", kind="config")
trig_every_n_lo = Cpt(EpicsSignalWithRBV, suffix="trig_every_n_lo", kind="config")
# XAS simple scan settings
s_scan_angle_hi = Cpt(EpicsSignalWithRBV, suffix="s_scan_angle_hi", kind="config")
@@ -169,6 +183,34 @@ class Mo1BraggScanSettings(Device):
a_scan_vel = Cpt(EpicsSignalWithRBV, suffix="a_scan_vel", kind="config", auto_monitor=True)
a_scan_time = Cpt(EpicsSignalWithRBV, suffix="a_scan_time", kind="config", auto_monitor=True)
class Mo1TriggerSettings(Device):
"""Mo1 Trigger settings"""
settle_time = Cpt(EpicsSignalWithRBV, suffix="settle_time", kind="config")
max_dev = Cpt(EpicsSignalWithRBV, suffix="max_dev", kind="config")
xrd_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_trig_src_ENUM", kind="config")
xrd_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_trig_mode_ENUM", kind="config")
xrd_trig_len = Cpt(EpicsSignalWithRBV, suffix="xrd_trig_len", kind="config")
xrd_trig_req = Cpt(EpicsSignal, suffix="xrd_trig_req", kind="config")
falcon_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="falcon_trig_src_ENUM", kind="config")
falcon_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="falcon_trig_mode_ENUM", kind="config")
falcon_trig_len = Cpt(EpicsSignalWithRBV, suffix="falcon_trig_len", kind="config")
falcon_trig_req = Cpt(EpicsSignal, suffix="falcon_trig_req", kind="config")
univ1_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="univ1_trig_src_ENUM", kind="config")
univ1_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="univ1_trig_mode_ENUM", kind="config")
univ1_trig_len = Cpt(EpicsSignalWithRBV, suffix="univ1_trig_len", kind="config")
univ1_trig_req = Cpt(EpicsSignal, suffix="univ1_trig_req", kind="config")
univ2_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="univ2_trig_src_ENUM", kind="config")
univ2_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="univ2_trig_mode_ENUM", kind="config")
univ2_trig_len = Cpt(EpicsSignalWithRBV, suffix="univ2_trig_len", kind="config")
univ2_trig_req = Cpt(EpicsSignal, suffix="univ2_trig_req", kind="config")
class Mo1BraggCalculator(Device):
"""Mo1 Bragg PVs to convert angle to energy or vice-versa."""
@@ -177,6 +219,7 @@ class Mo1BraggCalculator(Device):
calc_energy = Cpt(EpicsSignalWithRBV, suffix="calc_energy", kind="config")
calc_angle = Cpt(EpicsSignalWithRBV, suffix="calc_angle", kind="config")
class Mo1BraggScanControl(Device):
"""Mo1 Bragg PVs to control the scan after setting the parameters."""
@@ -189,9 +232,7 @@ class Mo1BraggScanControl(Device):
scan_start_infinite = Cpt(
EpicsSignal, suffix="scan_start_infinite", kind="config", put_complete=True
)
scan_start_timer = Cpt(
EpicsSignal, suffix="scan_start_timer", kind="config", put_complete=True
)
scan_start_timer = Cpt(EpicsSignal, suffix="scan_start_timer", kind="config", put_complete=True)
scan_stop = Cpt(EpicsSignal, suffix="scan_stop", kind="config", put_complete=True)
scan_status = Cpt(
EpicsSignalRO, suffix="scan_status_ENUM_RBV", kind="config", auto_monitor=True
@@ -200,9 +241,7 @@ class Mo1BraggScanControl(Device):
EpicsSignalRO, suffix="scan_time_left_RBV", kind="config", auto_monitor=True
)
scan_done = Cpt(EpicsSignalRO, suffix="scan_done_RBV", kind="config", auto_monitor=True)
scan_val_reset = Cpt(
EpicsSignal, suffix="scan_val_reset", kind="config", put_complete=True
)
scan_val_reset = Cpt(EpicsSignal, suffix="scan_val_reset", kind="config", put_complete=True)
scan_progress = Cpt(EpicsSignalRO, suffix="scan_progress_RBV", kind="config", auto_monitor=True)
scan_spectra_done = Cpt(
EpicsSignalRO, suffix="scan_n_osc_RBV", kind="config", auto_monitor=True
@@ -221,10 +260,8 @@ class ScanParameter:
scan_time: float = None
scan_duration: float = None
xrd_enable_low: bool = None
xrd_enable_high: bool = None
num_trigger_low: int = None
num_trigger_high: int = None
xrd_enable_low: bool = None # trig_enable_low: bool = None
xrd_enable_high: bool = None # trig_enable_high: bool = None
exp_time_low: float = None
exp_time_high: float = None
cycle_low: int = None
@@ -241,11 +278,12 @@ class Mo1Bragg(Device, PositionerBase):
the NI motor controller via web sockets.
"""
USER_ACCESS = []
USER_ACCESS = ["set_advanced_xas_settings"]
crystal = Cpt(Mo1BraggCrystal, "")
encoder = Cpt(Mo1BraggEncoder, "")
scan_settings = Cpt(Mo1BraggScanSettings, "")
trigger_settings = Cpt(Mo1TriggerSettings, "")
calculator = Cpt(Mo1BraggCalculator, "")
scan_control = Cpt(Mo1BraggScanControl, "")
status = Cpt(Mo1BraggStatus, "")
@@ -544,16 +582,14 @@ class Mo1Bragg(Device, PositionerBase):
@typechecked
def convert_angle_energy(
self,
mode:Literal["AngleToEnergy", "EnergyToAngle"],
inp:float
self, mode: Literal["AngleToEnergy", "EnergyToAngle"], inp: float
) -> float:
"""Calculate energy to angle or vice versa
Args:
mode (Literal["AngleToEnergy", "EnergyToAngle"]): Mode of calculation
input (float): Either angle or energy
Returns:
output (float): Converted angle or energy
"""
@@ -581,25 +617,20 @@ class Mo1Bragg(Device, PositionerBase):
raise TimeoutError(
f"Timeout after {self.timeout_for_pvwait} while waiting for calc done,"
)
time.sleep(0.25) # Needed due to update frequency of softIOC
time.sleep(0.25) # Needed due to update frequency of softIOC
if mode == "AngleToEnergy":
return self.calculator.calc_energy.get()
elif mode == "EnergyToAngle":
return self.calculator.calc_angle.get()
def set_advanced_xas_settings(
self,
low: float,
high:float,
scan_time: float,
p_kink: float,
e_kink: float
self, low: float, high: float, scan_time: float, p_kink: float, e_kink: float
) -> None:
"""Set Advanced XAS parameters for upcoming scan.
Args:
low (float): Low angle value of the scan in deg
high (float): High angle value of the scan in deg
low (float): Low angle value of the scan in eV
high (float): High angle value of the scan in eV
scan_time (float): Time for a half oscillation in s
p_kink (float): Position of kink in %
e_kink (float): Energy of kink in eV
@@ -620,8 +651,8 @@ class Mo1Bragg(Device, PositionerBase):
pos, vel, dt = compute_spline(
low_deg=low_deg,
high_deg=high_deg,
p_kink =p_kink,
e_kink_deg = e_kink_deg,
p_kink=p_kink,
e_kink_deg=e_kink_deg,
scan_time=scan_time,
)
@@ -629,22 +660,20 @@ class Mo1Bragg(Device, PositionerBase):
self.scan_settings.a_scan_vel.set(vel)
self.scan_settings.a_scan_time.set(dt)
def set_xrd_settings(
def set_trig_settings(
self,
enable_low: bool,
enable_high: bool,
num_trigger_low: int,
num_trigger_high: int,
exp_time_low: int,
exp_time_high: int,
cycle_low: int,
cycle_high: int,
) -> None:
"""Set XRD settings for the upcoming scan.
"""Set TRIG settings for the upcoming scan.
Args:
enable_low (bool): Enable XRD for low energy/angle
enable_high (bool): Enable XRD for high energy/angle
enable_low (bool): Enable TRIG for low energy/angle
enable_high (bool): Enable TRIG for high energy/angle
num_trigger_low (int): Number of triggers for low energy/angle
num_trigger_high (int): Number of triggers for high energy/angle
exp_time_low (int): Exposure time for low energy/angle
@@ -652,14 +681,12 @@ class Mo1Bragg(Device, PositionerBase):
cycle_low (int): Cycle for low energy/angle
cycle_high (int): Cycle for high energy/angle
"""
self.scan_settings.xrd_enable_hi_enum.put(int(enable_high))
self.scan_settings.xrd_enable_lo_enum.put(int(enable_low))
self.scan_settings.xrd_n_trigger_hi.put(num_trigger_high)
self.scan_settings.xrd_n_trigger_lo.put(num_trigger_low)
self.scan_settings.xrd_time_hi.put(exp_time_high)
self.scan_settings.xrd_time_lo.put(exp_time_low)
self.scan_settings.xrd_every_n_hi.put(cycle_high)
self.scan_settings.xrd_every_n_lo.put(cycle_low)
self.scan_settings.trig_ena_hi_enum.put(int(enable_high))
self.scan_settings.trig_ena_lo_enum.put(int(enable_low))
self.scan_settings.trig_time_hi.put(exp_time_high)
self.scan_settings.trig_time_lo.put(exp_time_low)
self.scan_settings.trig_every_n_hi.put(cycle_high)
self.scan_settings.trig_every_n_lo.put(cycle_low)
def set_scan_control_settings(self, mode: ScanControlMode, scan_duration: float) -> None:
"""Set the scan control settings for the upcoming scan.
@@ -685,7 +712,7 @@ class Mo1Bragg(Device, PositionerBase):
def kickoff(self):
"""Kickoff the device, called from BEC."""
scan_duration = self.scan_control.scan_duration.get()
#TODO implement better logic for infinite scans, at least bring it up with Debye
# TODO implement better logic for infinite scans, at least bring it up with Debye
start_func = (
self.scan_control.scan_start_infinite.put
if scan_duration < 0.1
@@ -756,11 +783,9 @@ class Mo1Bragg(Device, PositionerBase):
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
)
self.set_xrd_settings(
self.set_trig_settings(
enable_low=False,
enable_high=False,
num_trigger_low=0,
num_trigger_high=0,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
@@ -775,11 +800,9 @@ class Mo1Bragg(Device, PositionerBase):
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
)
self.set_xrd_settings(
enable_low=self.scan_parameter.xrd_enable_low,
enable_high=self.scan_parameter.xrd_enable_high,
num_trigger_low=self.scan_parameter.num_trigger_low,
num_trigger_high=self.scan_parameter.num_trigger_high,
self.set_trig_settings(
enable_low=self.scan_parameter.xrd_enable_low, # enable_low=self.scan_parameter.trig_enable_low,
enable_high=self.scan_parameter.xrd_enable_high, # enable_high=self.scan_parameter.trig_enable_high,
exp_time_low=self.scan_parameter.exp_time_low,
exp_time_high=self.scan_parameter.exp_time_high,
cycle_low=self.scan_parameter.cycle_low,
@@ -796,11 +819,9 @@ class Mo1Bragg(Device, PositionerBase):
p_kink=self.scan_parameter.p_kink,
e_kink=self.scan_parameter.e_kink,
)
self.set_xrd_settings(
self.set_trig_settings(
enable_low=False,
enable_high=False,
num_trigger_low=0,
num_trigger_high=0,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
@@ -817,11 +838,9 @@ class Mo1Bragg(Device, PositionerBase):
p_kink=self.scan_parameter.p_kink,
e_kink=self.scan_parameter.e_kink,
)
self.set_xrd_settings(
enable_low=self.scan_parameter.xrd_enable_low,
enable_high=self.scan_parameter.xrd_enable_high,
num_trigger_low=self.scan_parameter.num_trigger_low,
num_trigger_high=self.scan_parameter.num_trigger_high,
self.set_trig_settings(
enable_low=self.scan_parameter.xrd_enable_low, # enable_low=self.scan_parameter.trig_enable_low,
enable_high=self.scan_parameter.xrd_enable_high, # enable_high=self.scan_parameter.trig_enable_high,
exp_time_low=self.scan_parameter.exp_time_low,
exp_time_high=self.scan_parameter.exp_time_high,
cycle_low=self.scan_parameter.cycle_low,

View File

@@ -190,7 +190,7 @@ class NIDAQ(PSIDetectorBase):
custom_prepare_cls = NIDAQCustomMixin
def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs):
super().__init__(prefix, name=name, kind=kind, parent=parent, device_manager=device_manager, **kwargs)
super().__init__(name=name, prefix=prefix, kind=kind, parent=parent, device_manager=device_manager, **kwargs)
def set_config(

View File

@@ -0,0 +1,81 @@
""" ES2 Pilatus Curtain"""
import time
from ophyd import Component as Cpt
from ophyd import Device, Kind, EpicsSignal, EpicsSignalRO
from ophyd_devices.utils import bec_utils
class GasMixSetup(Device):
"""Class for the ES2 Pilatus Curtain"""
USER_ACCESS = ['open', 'close']
open_cover = Cpt(
EpicsSignal, suffix="OpenCover", kind="config", doc='Open Cover'
)
close_cover = Cpt(
EpicsSignal, suffix="CloseCover", kind="config", doc='Close Cover'
)
cover_is_closed = Cpt(
EpicsSignalRO, suffix="CoverIsClosed", kind="config", doc='Cover is closed'
)
cover_is_open = Cpt(
EpicsSignalRO, suffix="CoverIsOpen", kind="config", doc='Cover is open'
)
cover_is_moving = Cpt(
EpicsSignalRO, suffix="CoverIsMoving", kind="config", doc='Cover is moving'
)
cover_error = Cpt(
EpicsSignalRO, suffix="CoverError", kind="config", doc='Cover error'
)
def __init__(
self, prefix="", *, name: str, kind: Kind = None, device_manager=None, parent=None, **kwargs
):
"""Initialize the Pilatus Curtain.
Args:
prefix (str): EPICS prefix for the device
name (str): Name of the device
kind (Kind): Kind of the device
device_manager (DeviceManager): Device manager instance
parent (Device): Parent device
kwargs: Additional keyword arguments
"""
super().__init__(prefix, name=name, kind=kind, parent=parent, **kwargs)
self.device_manager = device_manager
self.service_cfg = None
self.timeout_for_pvwait = 30
self.readback.name = self.name
# Wait for connection on all components, ensure IOC is connected
self.wait_for_connection(all_signals=True, timeout=5)
if device_manager:
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
self.connector = self.device_manager.connector
def open(self) -> None:
"""Open the cover"""
self.open_cover.put(1)
while not self.cover_is_open.get():
time.sleep(0.1)
if self.cover_error.get():
raise TimeoutError('Curtain did not open successfully and is now in an error state')
def close(self) -> None:
"""Close the cover"""
self.close_cover.put(1)
while not self.cover_is_closed.get():
time.sleep(0.1)
if self.cover_error.get():
raise TimeoutError('Curtain did not close successfully and is now in an error state')

View File

@@ -12,7 +12,7 @@ classifiers = [
"Programming Language :: Python :: 3",
"Topic :: Scientific/Engineering",
]
dependencies = ["numpy ~= 1.24", "scipy", "bec_lib", "h5py", "ophyd_devices"]
dependencies = ["numpy", "scipy", "bec_lib", "h5py", "ophyd_devices"]
[project.optional-dependencies]
dev = [

View File

@@ -150,26 +150,22 @@ def test_set_xas_settings(mock_bragg):
assert dev.scan_settings.s_scan_scantime.get() == 1
def test_set_xrd_settings(mock_bragg):
def test_set_trig_settings(mock_bragg):
dev = mock_bragg
dev.set_xrd_settings(
dev.set_trig_settings(
enable_low=True,
enable_high=False,
num_trigger_low=1,
num_trigger_high=7,
exp_time_low=1,
exp_time_high=3,
exp_time_high=0.1,
exp_time_low=0.01,
cycle_low=1,
cycle_high=5,
cycle_high=3,
)
assert dev.scan_settings.xrd_enable_lo_enum.get() == True
assert dev.scan_settings.xrd_enable_hi_enum.get() == False
assert dev.scan_settings.xrd_n_trigger_lo.get() == 1
assert dev.scan_settings.xrd_n_trigger_hi.get() == 7
assert dev.scan_settings.xrd_time_lo.get() == 1
assert dev.scan_settings.xrd_time_hi.get() == 3
assert dev.scan_settings.xrd_every_n_lo.get() == 1
assert dev.scan_settings.xrd_every_n_hi.get() == 5
assert dev.scan_settings.trig_ena_lo_enum.get() == True
assert dev.scan_settings.trig_ena_hi_enum.get() == False
assert dev.scan_settings.trig_every_n_lo.get() == 1
assert dev.scan_settings.trig_every_n_hi.get() == 3
assert dev.scan_settings.trig_time_lo.get() == 0.01
assert dev.scan_settings.trig_time_hi.get() == 0.1
def test_set_control_settings(mock_bragg):
@@ -437,7 +433,7 @@ def test_stage(mock_bragg, scan_worker_mock, msg):
mock.patch.object(
mock_bragg, "set_advanced_xas_settings"
) as mock_advanced_xas_settings,
mock.patch.object(mock_bragg, "set_xrd_settings") as mock_xrd_settings,
mock.patch.object(mock_bragg, "set_trig_settings") as mock_trig_settings,
mock.patch.object(
mock_bragg, "set_scan_control_settings"
) as mock_set_scan_control_settings,
@@ -450,11 +446,9 @@ def test_stage(mock_bragg, scan_worker_mock, msg):
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
)
assert mock_xrd_settings.call_args == mock.call(
assert mock_trig_settings.call_args == mock.call(
enable_low=False,
enable_high=False,
num_trigger_low=0,
num_trigger_high=0,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
@@ -474,17 +468,11 @@ def test_stage(mock_bragg, scan_worker_mock, msg):
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
)
assert mock_xrd_settings.call_args == mock.call(
assert mock_trig_settings.call_args == mock.call(
enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"],
enable_high=scan_status_msg.content["info"]["kwargs"][
"xrd_enable_high"
],
num_trigger_low=scan_status_msg.content["info"]["kwargs"][
"num_trigger_low"
],
num_trigger_high=scan_status_msg.content["info"]["kwargs"][
"num_trigger_high"
],
exp_time_low=scan_status_msg.content["info"]["kwargs"]["exp_time_low"],
exp_time_high=scan_status_msg.content["info"]["kwargs"][
"exp_time_high"
@@ -508,11 +496,9 @@ def test_stage(mock_bragg, scan_worker_mock, msg):
p_kink=scan_status_msg.content["info"]["kwargs"]["p_kink"],
e_kink=scan_status_msg.content["info"]["kwargs"]["e_kink"],
)
assert mock_xrd_settings.call_args == mock.call(
assert mock_trig_settings.call_args == mock.call(
enable_low=False,
enable_high=False,
num_trigger_low=0,
num_trigger_high=0,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
@@ -534,17 +520,11 @@ def test_stage(mock_bragg, scan_worker_mock, msg):
p_kink=scan_status_msg.content["info"]["kwargs"]["p_kink"],
e_kink=scan_status_msg.content["info"]["kwargs"]["e_kink"],
)
assert mock_xrd_settings.call_args == mock.call(
assert mock_trig_settings.call_args == mock.call(
enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"],
enable_high=scan_status_msg.content["info"]["kwargs"][
"xrd_enable_high"
],
num_trigger_low=scan_status_msg.content["info"]["kwargs"][
"num_trigger_low"
],
num_trigger_high=scan_status_msg.content["info"]["kwargs"][
"num_trigger_high"
],
exp_time_low=scan_status_msg.content["info"]["kwargs"]["exp_time_low"],
exp_time_high=scan_status_msg.content["info"]["kwargs"][
"exp_time_high"