Compare commits
1 Commits
test_pseud
...
feat-csaxs
| Author | SHA1 | Date | |
|---|---|---|---|
| f8d9b55bc3 |
@@ -70,7 +70,7 @@ DLPCA200_AMPLIFIER_CONFIG: dict[str, dict] = {
|
||||
"rio_device": "galilrioesxbox",
|
||||
"description": "Beam Position Monitor 4 current amplifier",
|
||||
"channels": {
|
||||
"gain_lsb": rio_optics.analog_in.ch0, # Pin 10 -> Galil ch0
|
||||
"gain_lsb": 0, # Pin 10 -> Galil ch0
|
||||
"gain_mid": 1, # Pin 11 -> Galil ch1
|
||||
"gain_msb": 2, # Pin 12 -> Galil ch2
|
||||
"coupling": 3, # Pin 13 -> Galil ch3
|
||||
|
||||
@@ -544,66 +544,6 @@ sl5trxt:
|
||||
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
|
||||
bl_smar_stage: 5
|
||||
|
||||
sl5ch:
|
||||
description: ESbox1 slit 5 center horizontal
|
||||
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
|
||||
deviceConfig:
|
||||
left_slit: sl5trxi
|
||||
right_slit: sl5trxo
|
||||
offset: 0
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
needs:
|
||||
- sl5trxi
|
||||
- sl5trxo
|
||||
|
||||
sl5wh:
|
||||
description: ESbox1 slit 5 width horizontal
|
||||
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
|
||||
deviceConfig:
|
||||
left_slit: sl5trxi
|
||||
right_slit: sl5trxo
|
||||
offset: 0
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
needs:
|
||||
- sl5trxi
|
||||
- sl5trxo
|
||||
|
||||
sl5cv:
|
||||
description: ESbox1 slit 5 center vertical
|
||||
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
|
||||
deviceConfig:
|
||||
left_slit: sl5trxb
|
||||
right_slit: sl5trxt
|
||||
offset: 0
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
needs:
|
||||
- sl5trxb
|
||||
- sl5trxt
|
||||
|
||||
sl5wv:
|
||||
description: ESbox1 slit 5 width vertical
|
||||
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
|
||||
deviceConfig:
|
||||
left_slit: sl5trxb
|
||||
right_slit: sl5trxt
|
||||
offset: 0
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
needs:
|
||||
- sl5trxb
|
||||
- sl5trxt
|
||||
|
||||
xbimtrx:
|
||||
description: ESbox2 beam intensity monitor x movement
|
||||
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
|
||||
@@ -882,37 +822,4 @@ dettrx:
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
|
||||
####################
|
||||
### Beamstop control for flight tube
|
||||
####################
|
||||
|
||||
beamstop_control:
|
||||
description: Gain control for beamstop flightube
|
||||
deviceClass: csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl
|
||||
deviceConfig:
|
||||
gain_lsb: galilrioesft.digital_out.ch0 # Pin 10 -> Galil ch0
|
||||
gain_mid: galilrioesft.digital_out.ch1 # Pin 11 -> Galil ch1
|
||||
gain_msb: galilrioesft.digital_out.ch2 # Pin 12 -> Galil ch2
|
||||
coupling: galilrioesft.digital_out.ch3 # Pin 13 -> Galil ch3
|
||||
speed_mode: galilrioesft.digital_out.ch4 # Pin 14 -> Galil ch4
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
onFailure: retry
|
||||
needs:
|
||||
- galilrioesft
|
||||
|
||||
galilrioesft:
|
||||
description: Galil RIO for remote gain switching and slow reading FlightTube
|
||||
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
|
||||
deviceConfig:
|
||||
host: galilrioesft.psi.ch
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
|
||||
|
||||
softwareTrigger: false
|
||||
@@ -199,25 +199,6 @@ xbpm1c4:
|
||||
readOnly: true
|
||||
softwareTrigger: false
|
||||
|
||||
bpm1:
|
||||
description: 'XBPM1 (frontend)'
|
||||
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
|
||||
deviceConfig:
|
||||
left_top: xbpm1c1
|
||||
right_top: xbpm1c2
|
||||
right_bot: xbpm1c3
|
||||
left_bot: xbpm1c4
|
||||
onFailure: raise
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
readOnly: true
|
||||
softwareTrigger: false
|
||||
needs:
|
||||
- xbpm1c1
|
||||
- xbpm1c2
|
||||
- xbpm1c3
|
||||
- xbpm1c4
|
||||
|
||||
############################################
|
||||
######### End of xbpm sub devices ##########
|
||||
############################################
|
||||
|
||||
@@ -68,22 +68,18 @@ ccmx:
|
||||
- cSAXS
|
||||
- optics
|
||||
|
||||
# TO BE REVIEWED, REMOVE VELOCITY WITH NEW CLASS!
|
||||
ccm_energy:
|
||||
description: 'test'
|
||||
deviceClass: ophyd_devices.devices.simple_positioner.PSISimplePositioner
|
||||
deviceConfig:
|
||||
prefix: 'X12SA-OP-CCM1:'
|
||||
override_suffixes:
|
||||
user_readback: "ENERGY-GET"
|
||||
user_setpoint: "ENERGY-SET"
|
||||
velocity: "ROTY.VELO"
|
||||
motor_done_move: "ROTY.DMOV"
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
# ccm_energy:
|
||||
# readoutPriority: baseline
|
||||
# deviceClass: ophyd_devices.devices.simple_positioner.PSIPositionerBase
|
||||
# prefix: "X12SA-OP-CCM1:"
|
||||
# override_suffixes:
|
||||
# user_readback: "ENERGY-GET"
|
||||
# user_setpoint: "ENERGY-SET"
|
||||
# velocity: "ROTY:VELO"
|
||||
# deviceTags:
|
||||
# - user motors
|
||||
# enabled: true
|
||||
# readOnly: false
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,24 +0,0 @@
|
||||
galilrioesxbox:
|
||||
description: Galil RIO for remote gain switching and slow reading ES XBox
|
||||
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
|
||||
deviceConfig:
|
||||
host: galilrioesft.psi.ch
|
||||
enabled: true
|
||||
onFailure: raise
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
bpm1:
|
||||
readoutPriority: baseline
|
||||
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
|
||||
deviceConfig:
|
||||
blade_t: galilrioesxbox.analog_in.ch0
|
||||
blade_r: galilrioesxbox.analog_in.ch1
|
||||
blade_b: galilrioesxbox.analog_in.ch2
|
||||
blade_l: galilrioesxbox.analog_in.ch3
|
||||
enabled: true
|
||||
readOnly: false
|
||||
softwareTrigger: true
|
||||
needs:
|
||||
- galilrioesxbox
|
||||
|
||||
@@ -48,6 +48,7 @@ class OMNYFastShutter(PSIDeviceBase, Device):
|
||||
def fshopen(self):
|
||||
"""Open the fast shutter."""
|
||||
if self._check_if_cSAXS_shutter_exists_in_config():
|
||||
self.shutter.put(1)
|
||||
return self.device_manager.devices["fsh"].fshopen()
|
||||
else:
|
||||
self.shutter.put(1)
|
||||
@@ -55,6 +56,7 @@ class OMNYFastShutter(PSIDeviceBase, Device):
|
||||
def fshclose(self):
|
||||
"""Close the fast shutter."""
|
||||
if self._check_if_cSAXS_shutter_exists_in_config():
|
||||
self.shutter.put(0)
|
||||
return self.device_manager.devices["fsh"].fshclose()
|
||||
else:
|
||||
self.shutter.put(0)
|
||||
|
||||
@@ -1,172 +0,0 @@
|
||||
"""Module for a BPM pseudo device that computes the position and intensity from the blade signals."""
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Kind, Signal
|
||||
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
|
||||
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
|
||||
|
||||
|
||||
class BPM(PSIPseudoDeviceBase):
|
||||
"""BPM positioner pseudo device."""
|
||||
|
||||
# Blade signals, a,b,c,d
|
||||
left_top = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="left_top",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM left_top blade",
|
||||
)
|
||||
right_top = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="right_top",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM right_top blade",
|
||||
)
|
||||
right_bot = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="right_bot",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM right_bottom blade",
|
||||
)
|
||||
left_bot = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="left_bot",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM left_bot blade",
|
||||
)
|
||||
|
||||
# Virtual signals
|
||||
pos_x = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="pos_x",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM X position, -1 fully left, 1 fully right",
|
||||
)
|
||||
pos_y = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="pos_y",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM Y position, -1 fully bottom, 1 fully top",
|
||||
)
|
||||
diagonal = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="diagonal",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM diagonal, -1 fully diagonal left_top-right_bot, 1 fully diagonal right_top-left_bot",
|
||||
)
|
||||
intensity = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="intensity",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM intensity",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name,
|
||||
left_top: str,
|
||||
right_top: str,
|
||||
right_bot: str,
|
||||
left_bot: str,
|
||||
device_manager=None,
|
||||
scan_info=None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
|
||||
# Get all blade signal objects from utility method
|
||||
signal_t = self.left_top.get_device_object_from_bec(
|
||||
object_name=left_top, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
signal_r = self.right_top.get_device_object_from_bec(
|
||||
object_name=right_top, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
signal_b = self.right_bot.get_device_object_from_bec(
|
||||
object_name=right_bot, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
signal_l = self.left_bot.get_device_object_from_bec(
|
||||
object_name=left_bot, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
|
||||
# Set compute methods for blade signals and virtual signals
|
||||
self.left_top.set_compute_method(self._compute_blade_signal, signal=signal_t)
|
||||
self.right_top.set_compute_method(self._compute_blade_signal, signal=signal_r)
|
||||
self.right_bot.set_compute_method(self._compute_blade_signal, signal=signal_b)
|
||||
self.left_bot.set_compute_method(self._compute_blade_signal, signal=signal_l)
|
||||
|
||||
self.intensity.set_compute_method(
|
||||
self._compute_intensity,
|
||||
left_top=self.left_top,
|
||||
right_top=self.right_top,
|
||||
right_bot=self.right_bot,
|
||||
left_bot=self.left_bot,
|
||||
)
|
||||
self.pos_x.set_compute_method(
|
||||
self._compute_pos_x,
|
||||
left_bot=self.left_bot,
|
||||
left_top=self.left_top,
|
||||
right_top=self.right_top,
|
||||
right_bot=self.right_bot,
|
||||
)
|
||||
self.pos_y.set_compute_method(
|
||||
self._compute_pos_y,
|
||||
left_bot=self.left_bot,
|
||||
left_top=self.left_top,
|
||||
right_top=self.right_top,
|
||||
right_bot=self.right_bot,
|
||||
)
|
||||
self.diagonal.set_compute_method(
|
||||
self._compute_diagonal,
|
||||
left_bot=self.left_bot,
|
||||
left_top=self.left_top,
|
||||
right_top=self.right_top,
|
||||
right_bot=self.right_bot,
|
||||
)
|
||||
|
||||
def _compute_blade_signal(self, signal: Signal) -> float:
|
||||
return signal.get()
|
||||
|
||||
def _compute_intensity(
|
||||
self, left_top: Signal, right_top: Signal, right_bot: Signal, left_bot: Signal
|
||||
) -> float:
|
||||
intensity = left_top.get() + right_top.get() + right_bot.get() + left_bot.get()
|
||||
return intensity
|
||||
|
||||
def _compute_pos_x(
|
||||
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
|
||||
) -> float:
|
||||
"""X position from -1 to 1, where -1 means beam fully on the left side, 1 means beam fully on the right side."""
|
||||
sum_left = left_bot.get() + left_top.get()
|
||||
sum_right = right_top.get() + right_bot.get()
|
||||
sum_total = sum_left + sum_right
|
||||
if sum_total == 0:
|
||||
return 0.0
|
||||
return (sum_right - sum_left) / sum_total
|
||||
|
||||
def _compute_pos_y(
|
||||
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
|
||||
) -> float:
|
||||
"""Y position from -1 to 1, where -1 means beam fully on the bottom side, 1 means beam fully on the top side."""
|
||||
sum_top = left_top.get() + right_top.get()
|
||||
sum_bot = right_bot.get() + left_bot.get()
|
||||
sum_total = sum_top + sum_bot
|
||||
if sum_total == 0:
|
||||
return 0.0
|
||||
return (sum_top - sum_bot) / sum_total
|
||||
|
||||
def _compute_diagonal(
|
||||
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
|
||||
) -> float:
|
||||
sum_diag1 = left_bot.get() + right_top.get()
|
||||
sum_diag2 = left_top.get() + right_bot.get()
|
||||
sum_total = sum_diag1 + sum_diag2
|
||||
if sum_total == 0:
|
||||
return 0.0
|
||||
return (sum_diag1 - sum_diag2) / sum_total
|
||||
@@ -1,189 +0,0 @@
|
||||
"""
|
||||
Module for controlling the BPM amplifier settings, such as gain and coupling.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Kind
|
||||
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
|
||||
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
|
||||
from ophyd import Signal
|
||||
|
||||
_GAIN_BITS_LOW_NOISE: dict[tuple, int] = {
|
||||
(0, 0, 0): int(1e3),
|
||||
(0, 0, 1): int(1e4),
|
||||
(0, 1, 0): int(1e5),
|
||||
(0, 1, 1): int(1e6),
|
||||
(1, 0, 0): int(1e7),
|
||||
(1, 0, 1): int(1e8),
|
||||
(1, 1, 0): int(1e9),
|
||||
}
|
||||
|
||||
_GAIN_BITS_HIGH_SPEED: dict[tuple, int] = {
|
||||
(0, 0, 0): int(1e5),
|
||||
(0, 0, 1): int(1e6),
|
||||
(0, 1, 0): int(1e7),
|
||||
(0, 1, 1): int(1e8),
|
||||
(1, 0, 0): int(1e9),
|
||||
(1, 0, 1): int(1e10),
|
||||
(1, 1, 0): int(1e11),
|
||||
}
|
||||
|
||||
_GAIN_TO_BITS: dict[int, tuple] = {}
|
||||
for _bits, _gain in _GAIN_BITS_LOW_NOISE.items():
|
||||
_GAIN_TO_BITS[_gain] = (*_bits, True)
|
||||
for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items():
|
||||
if _gain not in _GAIN_TO_BITS: # low-noise takes priority
|
||||
_GAIN_TO_BITS[_gain] = (*_bits, False)
|
||||
|
||||
VALID_GAINS = sorted(_GAIN_TO_BITS.keys())
|
||||
|
||||
|
||||
class BPMControl(PSIPseudoDeviceBase):
|
||||
"""
|
||||
BPM amplifier control pseudo device. It is responsible for controlling the
|
||||
gain and coupling for the BPM amplifier. It relies on signals from a device
|
||||
in BEC to be available. For cSAXS, these are most liikely to be from the
|
||||
GalilRIO device that controls the BPM amplifier.
|
||||
|
||||
Args:
|
||||
name (str): Name of the pseudo device.
|
||||
gain_lsb (str): Name of the signal in BEC that controls the LSB
|
||||
of the gain setting.
|
||||
gain_mid (str): Name of the signal in BEC that controls the MID
|
||||
bit of the gain setting.
|
||||
gain_msb (str): Name of the signal in BEC that controls the MSB
|
||||
of the gain setting.
|
||||
coupling (str): Name of the signal in BEC that controls the coupling
|
||||
setting.
|
||||
speed_mode (str): Name of the signal in BEC that controls the speed mode
|
||||
(low-noise vs high-speed) of the amplifier.
|
||||
"""
|
||||
|
||||
USER_ACCESS = ["set_gain", "set_coupling"]
|
||||
|
||||
gain = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="gain",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="Gain of the amplifier",
|
||||
)
|
||||
coupling = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="coupling",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="Coupling of the amplifier",
|
||||
)
|
||||
speed = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="speed",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="Speed of the amplifier",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
gain_lsb: str,
|
||||
gain_mid: str,
|
||||
gain_msb: str,
|
||||
coupling: str,
|
||||
speed_mode: str,
|
||||
device_manager: DeviceManagerDS | None = None,
|
||||
scan_info: ScanInfo | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
|
||||
|
||||
# First we get all signal objects from BEC using the utility method provided by the BECProcessedSignal class.
|
||||
self._gain_lsb = self.gain.get_device_object_from_bec(
|
||||
object_name=gain_lsb, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
self._gain_mid = self.gain.get_device_object_from_bec(
|
||||
object_name=gain_mid, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
self._gain_msb = self.gain.get_device_object_from_bec(
|
||||
object_name=gain_msb, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
self._coupling = self.gain.get_device_object_from_bec(
|
||||
object_name=coupling, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
self._speed_mode = self.gain.get_device_object_from_bec(
|
||||
object_name=speed_mode, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
|
||||
# Set the compute methods for the virtual signals.
|
||||
self.gain.set_compute_method(
|
||||
self._compute_gain,
|
||||
msb=self._gain_msb,
|
||||
mid=self._gain_mid,
|
||||
lsb=self._gain_lsb,
|
||||
speed_mode=self._speed_mode,
|
||||
)
|
||||
self.coupling.set_compute_method(self._compute_coupling, coupling=self._coupling)
|
||||
self.speed.set_compute_method(self._compute_speed, speed=self._speed_mode)
|
||||
|
||||
def set_gain(
|
||||
self,
|
||||
gain: Literal[
|
||||
1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000, 100000000000
|
||||
],
|
||||
) -> None:
|
||||
"""
|
||||
Set the gain of the amplifier.
|
||||
|
||||
Args:
|
||||
gain (Literal): Must be one of 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000.
|
||||
"""
|
||||
gain_int = int(gain)
|
||||
if gain_int not in VALID_GAINS:
|
||||
raise ValueError(
|
||||
f"{self.name} received invalid gain {gain_int}, must be in {VALID_GAINS}"
|
||||
)
|
||||
|
||||
msb, mid, lsb, use_low_noise = _GAIN_TO_BITS[gain_int]
|
||||
|
||||
self._gain_msb.set(bool(msb)).wait(timeout=2)
|
||||
self._gain_lsb.set(bool(lsb)).wait(timeout=2)
|
||||
self._gain_mid.set(bool(mid)).wait(timeout=2)
|
||||
self._speed_mode.set(bool(use_low_noise))
|
||||
|
||||
def set_coupling(self, coupling: Literal["AC", "DC"]) -> None:
|
||||
"""
|
||||
Set the coupling of the amplifier.
|
||||
|
||||
Args:
|
||||
coupling (Literal): Must be either "AC" or "DC".
|
||||
"""
|
||||
if coupling not in ["AC", "DC"]:
|
||||
raise ValueError(
|
||||
f"{self.name} received invalid coupling value {coupling}, please use 'AC' or 'DC'"
|
||||
)
|
||||
self._coupling.set(coupling == "DC").wait(timeout=2)
|
||||
|
||||
def _compute_gain(self, msb: Signal, mid: Signal, lsb: Signal, speed_mode: Signal) -> int:
|
||||
"""Compute the gain based on the bits and speed mode."""
|
||||
bits = (msb.get(), mid.get(), lsb.get())
|
||||
speed_mode = speed_mode.get()
|
||||
if speed_mode:
|
||||
return _GAIN_BITS_LOW_NOISE.get(bits)
|
||||
else:
|
||||
return _GAIN_BITS_HIGH_SPEED.get(bits)
|
||||
|
||||
def _compute_coupling(self, coupling: Signal) -> str:
|
||||
"""Compute the coupling based on the signal."""
|
||||
return "DC" if coupling.get() else "AC"
|
||||
|
||||
def _compute_speed(self, speed: Signal) -> str:
|
||||
"""Compute the speed based on the signal."""
|
||||
return "low_speed" if speed.get() else "high_speed"
|
||||
@@ -1 +0,0 @@
|
||||
# from ophyd
|
||||
@@ -1 +1 @@
|
||||
from .csaxs_nexus import NeXus_format as cSAXS_NeXus_format
|
||||
from .csaxs_nexus import cSAXSNeXusFormat
|
||||
|
||||
@@ -1,445 +1,472 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import numpy as np
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_lib.devicemanager import DeviceManagerBase
|
||||
from bec_server.file_writer.file_writer import HDF5Storage
|
||||
from bec_server.file_writer.default_writer import DefaultFormat
|
||||
|
||||
|
||||
def get_entry(data: dict, name: str, default=None) -> Any:
|
||||
class cSAXSNeXusFormat(DefaultFormat):
|
||||
"""
|
||||
Get an entry from the scan data assuming a <device>.<device>.value structure.
|
||||
|
||||
Args:
|
||||
data (dict): Scan data
|
||||
name (str): Entry name
|
||||
default (Any, optional): Default value. Defaults to None.
|
||||
NeXus file format for cSAXS beamline. This format is based on the default NeXus format, but with some additional entries specific to the cSAXS beamline. The structure of the file is based on the NeXus standard, but with some additional groups and datasets specific to the cSAXS beamline.
|
||||
"""
|
||||
if isinstance(data.get(name), list) and isinstance(data.get(name)[0], dict):
|
||||
return [sub_data.get(name, {}).get("value", default) for sub_data in data.get(name)]
|
||||
|
||||
return data.get(name, {}).get(name, {}).get("value", default)
|
||||
def format(self) -> None:
|
||||
"""
|
||||
Prepare the NeXus file format.
|
||||
Override this method in file writer plugins to customize the HDF5 file format.
|
||||
|
||||
The class provides access to the following attributes:
|
||||
- self.storage: The HDF5Storage object.
|
||||
- self.data: The data dictionary.
|
||||
- self.file_references: The file references dictionary, which has the link to external data.
|
||||
- self.device_manager: The DeviceManagerBase object.
|
||||
- self.get_entry(name, default=None): Helper method to get an entry from the data dictionary.
|
||||
|
||||
def NeXus_format(
|
||||
storage: HDF5Storage, data: dict, file_references: dict, device_manager: DeviceManagerBase
|
||||
) -> HDF5Storage:
|
||||
"""
|
||||
Prepare the NeXus file format.
|
||||
See also: :class:`bec_server.file_writer.file_writer.HDF5Storage`.
|
||||
|
||||
Args:
|
||||
storage (HDF5Storage): HDF5 storage. Pseudo hdf5 file container that will be written to disk later.
|
||||
data (dict): scan data
|
||||
file_references (dict): File references. Can be used to add external files to the HDF5 file. The path is given relative to the HDF5 file.
|
||||
device_manager (DeviceManagerBase): Device manager. Can be used to check if devices are available.
|
||||
"""
|
||||
|
||||
Returns:
|
||||
HDF5Storage: Updated HDF5 storage
|
||||
"""
|
||||
# /entry
|
||||
entry = storage.create_group("entry")
|
||||
entry.attrs["NX_class"] = "NXentry"
|
||||
entry.attrs["definition"] = "NXsas"
|
||||
entry.attrs["start_time"] = data.get("start_time")
|
||||
entry.attrs["end_time"] = data.get("end_time")
|
||||
entry.attrs["version"] = 1.0
|
||||
# entry = self.storage.create_group("entry")
|
||||
|
||||
# /entry/collection
|
||||
collection = entry.create_group("collection")
|
||||
collection.attrs["NX_class"] = "NXcollection"
|
||||
bec_collection = collection.create_group("bec")
|
||||
# # /entry/control
|
||||
# control = entry.create_group("control")
|
||||
# control.attrs["NX_class"] = "NXmonitor"
|
||||
# control.create_dataset(name="mode", data="monitor")
|
||||
|
||||
# /entry/control
|
||||
control = entry.create_group("control")
|
||||
control.attrs["NX_class"] = "NXmonitor"
|
||||
control.create_dataset(name="mode", data="monitor")
|
||||
control.create_dataset(name="integral", data=get_entry(data, "bpm4i"))
|
||||
# #########
|
||||
# # EXAMPLE for soft link
|
||||
# #########
|
||||
# # /entry/data
|
||||
# if "eiger_4" in self.device_manager.devices:
|
||||
# entry.create_soft_link(name="data", target="/entry/instrument/eiger_4")
|
||||
|
||||
# /entry/data
|
||||
main_data = entry.create_group("data")
|
||||
main_data.attrs["NX_class"] = "NXdata"
|
||||
if "eiger_4" in device_manager.devices:
|
||||
main_data.create_soft_link(name="data", target="/entry/instrument/eiger_4/data")
|
||||
elif "eiger9m" in device_manager.devices:
|
||||
main_data.create_soft_link(name="data", target="/entry/instrument/eiger9m/data")
|
||||
elif "pilatus_2" in device_manager.devices:
|
||||
main_data.create_soft_link(name="data", target="/entry/instrument/pilatus_2/data")
|
||||
# ########
|
||||
# # EXAMPLE for external link
|
||||
# ########
|
||||
# # control = entry.create_group("sample")
|
||||
# # control.create_ext_link("data", self.file_references["eiger9m"]["path"], "EG9M/data")
|
||||
|
||||
# /entry/sample
|
||||
control = entry.create_group("sample")
|
||||
control.attrs["NX_class"] = "NXsample"
|
||||
control.create_dataset(name="name", data=get_entry(data, "samplename"))
|
||||
control.create_dataset(name="description", data=data.get("sample_description"))
|
||||
x_translation = control.create_dataset(name="x_translation", data=get_entry(data, "samx"))
|
||||
x_translation.attrs["units"] = "mm"
|
||||
y_translation = control.create_dataset(name="y_translation", data=get_entry(data, "samy"))
|
||||
y_translation.attrs["units"] = "mm"
|
||||
temperature_log = control.create_dataset(name="temperature_log", data=get_entry(data, "temp"))
|
||||
temperature_log.attrs["units"] = "K"
|
||||
# # /entry/sample
|
||||
# control = entry.create_group("sample")
|
||||
# control.attrs["NX_class"] = "NXsample"
|
||||
# control.create_dataset(name="name", data=self.data.get("samplename"))
|
||||
# control.create_dataset(name="description", data=self.data.get("sample_description"))
|
||||
|
||||
# /entry/instrument
|
||||
instrument = entry.create_group("instrument")
|
||||
instrument.attrs["NX_class"] = "NXinstrument"
|
||||
instrument.create_dataset(name="name", data="cSAXS beamline")
|
||||
# # /entry/instrument
|
||||
# instrument = entry.create_group("instrument")
|
||||
# instrument.attrs["NX_class"] = "NXinstrument"
|
||||
|
||||
source = instrument.create_group("source")
|
||||
source.attrs["NX_class"] = "NXsource"
|
||||
source.create_dataset(name="type", data="Synchrotron X-ray Source")
|
||||
source.create_dataset(name="name", data="Swiss Light Source")
|
||||
source.create_dataset(name="probe", data="x-ray")
|
||||
distance = source.create_dataset(
|
||||
name="distance", data=-33800 - np.asarray(get_entry(data, "samz", 0))
|
||||
)
|
||||
distance.attrs["units"] = "mm"
|
||||
sigma_x = source.create_dataset(name="sigma_x", data=0.202)
|
||||
sigma_x.attrs["units"] = "mm"
|
||||
sigma_y = source.create_dataset(name="sigma_y", data=0.018)
|
||||
sigma_y.attrs["units"] = "mm"
|
||||
divergence_x = source.create_dataset(name="divergence_x", data=0.000135)
|
||||
divergence_x.attrs["units"] = "radians"
|
||||
divergence_y = source.create_dataset(name="divergence_y", data=0.000025)
|
||||
divergence_y.attrs["units"] = "radians"
|
||||
current = source.create_dataset(name="current", data=get_entry(data, "curr"))
|
||||
current.attrs["units"] = "mA"
|
||||
# source = instrument.create_group("source")
|
||||
# source.attrs["NX_class"] = "NXsource"
|
||||
# source.create_dataset(name="type", data="Synchrotron X-ray Source")
|
||||
# source.create_dataset(name="name", data="Swiss Light Source")
|
||||
# source.create_dataset(name="probe", data="x-ray")
|
||||
|
||||
insertion_device = instrument.create_group("insertion_device")
|
||||
insertion_device.attrs["NX_class"] = "NXinsertion_device"
|
||||
source.create_dataset(name="type", data="undulator")
|
||||
gap = source.create_dataset(name="gap", data=get_entry(data, "idgap"))
|
||||
gap.attrs["units"] = "mm"
|
||||
k = source.create_dataset(name="k", data=2.46)
|
||||
k.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
length = source.create_dataset(name="length", data=1820)
|
||||
length.attrs["units"] = "mm"
|
||||
# # /entry
|
||||
# entry = self.storage.create_group("entry")
|
||||
# entry.attrs["NX_class"] = "NXentry"
|
||||
# entry.attrs["definition"] = "NXsas"
|
||||
# entry.attrs["start_time"] = self.data.get("start_time")
|
||||
# entry.attrs["end_time"] = self.data.get("end_time")
|
||||
# entry.attrs["version"] = 1.0
|
||||
|
||||
slit_0 = instrument.create_group("slit_0")
|
||||
slit_0.attrs["NX_class"] = "NXslit"
|
||||
source.create_dataset(name="material", data="OFHC Cu")
|
||||
source.create_dataset(name="description", data="Horizontal secondary source slit")
|
||||
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl0wh"))
|
||||
x_gap.attrs["units"] = "mm"
|
||||
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl0ch"))
|
||||
x_translation.attrs["units"] = "mm"
|
||||
distance = source.create_dataset(
|
||||
name="distance", data=-21700 - np.asarray(get_entry(data, "samz", 0))
|
||||
)
|
||||
distance.attrs["units"] = "mm"
|
||||
# # /entry/control
|
||||
# control = entry.create_group("control")
|
||||
# control.attrs["NX_class"] = "NXmonitor"
|
||||
# control.create_dataset(name="mode", data="monitor")
|
||||
# control.create_dataset(name="integral", data=self.get_entry("bpm4i"))
|
||||
|
||||
slit_1 = instrument.create_group("slit_1")
|
||||
slit_1.attrs["NX_class"] = "NXslit"
|
||||
source.create_dataset(name="material", data="OFHC Cu")
|
||||
source.create_dataset(name="description", data="Horizontal secondary source slit")
|
||||
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl1wh"))
|
||||
x_gap.attrs["units"] = "mm"
|
||||
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl1wv"))
|
||||
y_gap.attrs["units"] = "mm"
|
||||
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl1ch"))
|
||||
x_translation.attrs["units"] = "mm"
|
||||
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl1ch"))
|
||||
height.attrs["units"] = "mm"
|
||||
distance = source.create_dataset(
|
||||
name="distance", data=-7800 - np.asarray(get_entry(data, "samz", 0))
|
||||
)
|
||||
distance.attrs["units"] = "mm"
|
||||
# # /entry/data
|
||||
# main_data = entry.create_group("data")
|
||||
# main_data.attrs["NX_class"] = "NXdata"
|
||||
# if "eiger_4" in self.device_manager.devices:
|
||||
# main_data.create_soft_link(name="data", target="/entry/instrument/eiger_4/data")
|
||||
# elif "eiger9m" in self.device_manager.devices:
|
||||
# main_data.create_soft_link(name="data", target="/entry/instrument/eiger9m/data")
|
||||
# elif "pilatus_2" in self.device_manager.devices:
|
||||
# main_data.create_soft_link(name="data", target="/entry/instrument/pilatus_2/data")
|
||||
|
||||
mono = instrument.create_group("monochromator")
|
||||
mono.attrs["NX_class"] = "NXmonochromator"
|
||||
mokev = data.get("mokev", {})
|
||||
if mokev:
|
||||
if isinstance(mokev, list):
|
||||
mokev = mokev[0]
|
||||
wavelength = mono.create_dataset(
|
||||
name="wavelength", data=12.3984193 / (mokev.get("mokev").get("value") + 1e-9)
|
||||
)
|
||||
wavelength.attrs["units"] = "Angstrom"
|
||||
energy = mono.create_dataset(name="energy", data=mokev.get("mokev").get("value"))
|
||||
energy.attrs["units"] = "keV"
|
||||
mono.create_dataset(name="type", data="Double crystal fixed exit monochromator.")
|
||||
distance = mono.create_dataset(
|
||||
name="distance", data=-5220 - np.asarray(get_entry(data, "samz", 0))
|
||||
)
|
||||
distance.attrs["units"] = "mm"
|
||||
# # /entry/sample
|
||||
# control = entry.create_group("sample")
|
||||
# control.attrs["NX_class"] = "NXsample"
|
||||
# control.create_dataset(name="name", data=self.get_entry("samplename"))
|
||||
# control.create_dataset(name="description", data=self.data.get("sample_description"))
|
||||
# x_translation = control.create_dataset(name="x_translation", data=self.get_entry("samx"))
|
||||
# x_translation.attrs["units"] = "mm"
|
||||
# y_translation = control.create_dataset(name="y_translation", data=self.get_entry("samy"))
|
||||
# y_translation.attrs["units"] = "mm"
|
||||
# temperature_log = control.create_dataset(
|
||||
# name="temperature_log", data=self.get_entry("temp")
|
||||
# )
|
||||
# temperature_log.attrs["units"] = "K"
|
||||
|
||||
crystal_1 = mono.create_group("crystal_1")
|
||||
crystal_1.attrs["NX_class"] = "NXcrystal"
|
||||
crystal_1.create_dataset(name="usage", data="Bragg")
|
||||
crystal_1.create_dataset(name="order_no", data="1")
|
||||
crystal_1.create_dataset(name="reflection", data="[1 1 1]")
|
||||
bragg_angle = crystal_1.create_dataset(name="bragg_angle", data=get_entry(data, "moth1"))
|
||||
bragg_angle.attrs["units"] = "degrees"
|
||||
# # /entry/instrument
|
||||
# instrument = entry.create_group("instrument")
|
||||
# instrument.attrs["NX_class"] = "NXinstrument"
|
||||
# instrument.create_dataset(name="name", data="cSAXS beamline")
|
||||
|
||||
crystal_2 = mono.create_group("crystal_2")
|
||||
crystal_2.attrs["NX_class"] = "NXcrystal"
|
||||
crystal_2.create_dataset(name="usage", data="Bragg")
|
||||
crystal_2.create_dataset(name="order_no", data="2")
|
||||
crystal_2.create_dataset(name="reflection", data="[1 1 1]")
|
||||
bragg_angle = crystal_2.create_dataset(name="bragg_angle", data=get_entry(data, "moth1"))
|
||||
bragg_angle.attrs["units"] = "degrees"
|
||||
bend_x = crystal_2.create_dataset(name="bend_x", data=get_entry(data, "mobd"))
|
||||
bend_x.attrs["units"] = "degrees"
|
||||
# source = instrument.create_group("source")
|
||||
# source.attrs["NX_class"] = "NXsource"
|
||||
# source.create_dataset(name="type", data="Synchrotron X-ray Source")
|
||||
# source.create_dataset(name="name", data="Swiss Light Source")
|
||||
# source.create_dataset(name="probe", data="x-ray")
|
||||
# distance = source.create_dataset(
|
||||
# name="distance", data=-33800 - np.asarray(self.get_entry("samz", 0))
|
||||
# )
|
||||
# distance.attrs["units"] = "mm"
|
||||
# sigma_x = source.create_dataset(name="sigma_x", data=0.202)
|
||||
# sigma_x.attrs["units"] = "mm"
|
||||
# sigma_y = source.create_dataset(name="sigma_y", data=0.018)
|
||||
# sigma_y.attrs["units"] = "mm"
|
||||
# divergence_x = source.create_dataset(name="divergence_x", data=0.000135)
|
||||
# divergence_x.attrs["units"] = "radians"
|
||||
# divergence_y = source.create_dataset(name="divergence_y", data=0.000025)
|
||||
# divergence_y.attrs["units"] = "radians"
|
||||
# current = source.create_dataset(name="current", data=self.get_entry("curr"))
|
||||
# current.attrs["units"] = "mA"
|
||||
|
||||
xbpm4 = instrument.create_group("XBPM4")
|
||||
xbpm4.attrs["NX_class"] = "NXdetector"
|
||||
xbpm4_sum = xbpm4.create_group("XBPM4_sum")
|
||||
xbpm4_sum_data = xbpm4_sum.create_dataset(name="data", data=get_entry(data, "bpm4s"))
|
||||
xbpm4_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
xbpm4_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
|
||||
xbpm4_x = xbpm4.create_group("XBPM4_x")
|
||||
xbpm4_x_data = xbpm4_x.create_dataset(name="data", data=get_entry(data, "bpm4x"))
|
||||
xbpm4_x_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
xbpm4_x.create_dataset(
|
||||
name="description", data="Normalized difference of counts between left and right quadrants."
|
||||
)
|
||||
xbpm4_y = xbpm4.create_group("XBPM4_y")
|
||||
xbpm4_y_data = xbpm4_y.create_dataset(name="data", data=get_entry(data, "bpm4y"))
|
||||
xbpm4_y_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
xbpm4_y.create_dataset(
|
||||
name="description", data="Normalized difference of counts between high and low quadrants."
|
||||
)
|
||||
xbpm4_skew = xbpm4.create_group("XBPM4_skew")
|
||||
xbpm4_skew_data = xbpm4_skew.create_dataset(name="data", data=get_entry(data, "bpm4z"))
|
||||
xbpm4_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
xbpm4_skew.create_dataset(
|
||||
name="description", data="Normalized difference of counts between diagonal quadrants."
|
||||
)
|
||||
# insertion_device = instrument.create_group("insertion_device")
|
||||
# insertion_device.attrs["NX_class"] = "NXinsertion_device"
|
||||
# source.create_dataset(name="type", data="undulator")
|
||||
# gap = source.create_dataset(name="gap", data=self.get_entry("idgap"))
|
||||
# gap.attrs["units"] = "mm"
|
||||
# k = source.create_dataset(name="k", data=2.46)
|
||||
# k.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# length = source.create_dataset(name="length", data=1820)
|
||||
# length.attrs["units"] = "mm"
|
||||
|
||||
mirror = instrument.create_group("mirror")
|
||||
mirror.attrs["NX_class"] = "NXmirror"
|
||||
mirror.create_dataset(name="type", data="single")
|
||||
mirror.create_dataset(
|
||||
name="description",
|
||||
data="Grazing incidence mirror to reject high-harmonic wavelengths from the monochromator. There are three coating options available that are used depending on the X-ray energy, no coating (SiO2), rhodium (Rh) or platinum (Pt).",
|
||||
)
|
||||
incident_angle = mirror.create_dataset(name="incident_angle", data=get_entry(data, "mith"))
|
||||
incident_angle.attrs["units"] = "degrees"
|
||||
substrate_material = mirror.create_dataset(name="substrate_material", data="SiO2")
|
||||
substrate_material.attrs["units"] = "NX_CHAR"
|
||||
coating_material = mirror.create_dataset(name="coating_material", data="SiO2")
|
||||
coating_material.attrs["units"] = "NX_CHAR"
|
||||
bend_y = mirror.create_dataset(name="bend_y", data="mibd")
|
||||
bend_y.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
distance = mirror.create_dataset(
|
||||
name="distance", data=-4370 - np.asarray(get_entry(data, "samz", 0))
|
||||
)
|
||||
distance.attrs["units"] = "mm"
|
||||
# slit_0 = instrument.create_group("slit_0")
|
||||
# slit_0.attrs["NX_class"] = "NXslit"
|
||||
# source.create_dataset(name="material", data="OFHC Cu")
|
||||
# source.create_dataset(name="description", data="Horizontal secondary source slit")
|
||||
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl0wh"))
|
||||
# x_gap.attrs["units"] = "mm"
|
||||
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl0ch"))
|
||||
# x_translation.attrs["units"] = "mm"
|
||||
# distance = source.create_dataset(
|
||||
# name="distance", data=-21700 - np.asarray(self.get_entry("samz", 0))
|
||||
# )
|
||||
# distance.attrs["units"] = "mm"
|
||||
|
||||
xbpm5 = instrument.create_group("XBPM5")
|
||||
xbpm5.attrs["NX_class"] = "NXdetector"
|
||||
xbpm5_sum = xbpm5.create_group("XBPM5_sum")
|
||||
xbpm5_sum_data = xbpm5_sum.create_dataset(name="data", data=get_entry(data, "bpm5s"))
|
||||
xbpm5_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
xbpm5_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
|
||||
xbpm5_x = xbpm5.create_group("XBPM5_x")
|
||||
xbpm5_x_data = xbpm5_x.create_dataset(name="data", data=get_entry(data, "bpm5x"))
|
||||
xbpm5_x_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
xbpm5_x.create_dataset(
|
||||
name="description", data="Normalized difference of counts between left and right quadrants."
|
||||
)
|
||||
xbpm5_y = xbpm5.create_group("XBPM5_y")
|
||||
xbpm5_y_data = xbpm5_y.create_dataset(name="data", data=get_entry(data, "bpm5y"))
|
||||
xbpm5_y_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
xbpm5_y.create_dataset(
|
||||
name="description", data="Normalized difference of counts between high and low quadrants."
|
||||
)
|
||||
xbpm5_skew = xbpm5.create_group("XBPM5_skew")
|
||||
xbpm5_skew_data = xbpm5_skew.create_dataset(name="data", data=get_entry(data, "bpm5z"))
|
||||
xbpm5_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
xbpm5_skew.create_dataset(
|
||||
name="description", data="Normalized difference of counts between diagonal quadrants."
|
||||
)
|
||||
# slit_1 = instrument.create_group("slit_1")
|
||||
# slit_1.attrs["NX_class"] = "NXslit"
|
||||
# source.create_dataset(name="material", data="OFHC Cu")
|
||||
# source.create_dataset(name="description", data="Horizontal secondary source slit")
|
||||
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl1wh"))
|
||||
# x_gap.attrs["units"] = "mm"
|
||||
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl1wv"))
|
||||
# y_gap.attrs["units"] = "mm"
|
||||
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl1ch"))
|
||||
# x_translation.attrs["units"] = "mm"
|
||||
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl1ch"))
|
||||
# height.attrs["units"] = "mm"
|
||||
# distance = source.create_dataset(
|
||||
# name="distance", data=-7800 - np.asarray(self.get_entry("samz", 0))
|
||||
# )
|
||||
# distance.attrs["units"] = "mm"
|
||||
|
||||
slit_2 = instrument.create_group("slit_2")
|
||||
slit_2.attrs["NX_class"] = "NXslit"
|
||||
source.create_dataset(name="material", data="Ag")
|
||||
source.create_dataset(name="description", data="Slit 2, optics hutch")
|
||||
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl2wh"))
|
||||
x_gap.attrs["units"] = "mm"
|
||||
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl2wv"))
|
||||
y_gap.attrs["units"] = "mm"
|
||||
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl2ch"))
|
||||
x_translation.attrs["units"] = "mm"
|
||||
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl2cv"))
|
||||
height.attrs["units"] = "mm"
|
||||
distance = source.create_dataset(
|
||||
name="distance", data=-3140 - np.asarray(get_entry(data, "samz", 0))
|
||||
)
|
||||
distance.attrs["units"] = "mm"
|
||||
# mono = instrument.create_group("monochromator")
|
||||
# mono.attrs["NX_class"] = "NXmonochromator"
|
||||
# mokev = self.data.get("mokev", {})
|
||||
# if mokev:
|
||||
# if isinstance(mokev, list):
|
||||
# mokev = mokev[0]
|
||||
# wavelength = mono.create_dataset(
|
||||
# name="wavelength", data=12.3984193 / (mokev.get("mokev").get("value") + 1e-9)
|
||||
# )
|
||||
# wavelength.attrs["units"] = "Angstrom"
|
||||
# energy = mono.create_dataset(name="energy", data=mokev.get("mokev").get("value"))
|
||||
# energy.attrs["units"] = "keV"
|
||||
# mono.create_dataset(name="type", data="Double crystal fixed exit monochromator.")
|
||||
# distance = mono.create_dataset(
|
||||
# name="distance", data=-5220 - np.asarray(self.get_entry("samz", 0))
|
||||
# )
|
||||
# distance.attrs["units"] = "mm"
|
||||
|
||||
slit_3 = instrument.create_group("slit_3")
|
||||
slit_3.attrs["NX_class"] = "NXslit"
|
||||
source.create_dataset(name="material", data="Si")
|
||||
source.create_dataset(name="description", data="Slit 3, experimental hutch, exposure box")
|
||||
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl3wh"))
|
||||
x_gap.attrs["units"] = "mm"
|
||||
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl3wv"))
|
||||
y_gap.attrs["units"] = "mm"
|
||||
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl3ch"))
|
||||
x_translation.attrs["units"] = "mm"
|
||||
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl3cv"))
|
||||
height.attrs["units"] = "mm"
|
||||
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0))
|
||||
# distance.attrs["units"] = "mm"
|
||||
# crystal_1 = mono.create_group("crystal_1")
|
||||
# crystal_1.attrs["NX_class"] = "NXcrystal"
|
||||
# crystal_1.create_dataset(name="usage", data="Bragg")
|
||||
# crystal_1.create_dataset(name="order_no", data="1")
|
||||
# crystal_1.create_dataset(name="reflection", data="[1 1 1]")
|
||||
# bragg_angle = crystal_1.create_dataset(name="bragg_angle", data=self.get_entry("moth1"))
|
||||
# bragg_angle.attrs["units"] = "degrees"
|
||||
|
||||
filter_set = instrument.create_group("filter_set")
|
||||
filter_set.attrs["NX_class"] = "NXattenuator"
|
||||
filter_set.create_dataset(name="material", data="Si")
|
||||
filter_set.create_dataset(
|
||||
name="description",
|
||||
data="The filter set consists of 4 linear stages, each with five filter positions. Additionally, each one allows for an out position to allow 'no filtering'.",
|
||||
)
|
||||
attenuator_transmission = filter_set.create_dataset(
|
||||
name="attenuator_transmission", data=10 ** get_entry(data, "ftrans", 0)
|
||||
)
|
||||
attenuator_transmission.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# crystal_2 = mono.create_group("crystal_2")
|
||||
# crystal_2.attrs["NX_class"] = "NXcrystal"
|
||||
# crystal_2.create_dataset(name="usage", data="Bragg")
|
||||
# crystal_2.create_dataset(name="order_no", data="2")
|
||||
# crystal_2.create_dataset(name="reflection", data="[1 1 1]")
|
||||
# bragg_angle = crystal_2.create_dataset(name="bragg_angle", data=self.get_entry("moth1"))
|
||||
# bragg_angle.attrs["units"] = "degrees"
|
||||
# bend_x = crystal_2.create_dataset(name="bend_x", data=self.get_entry("mobd"))
|
||||
# bend_x.attrs["units"] = "degrees"
|
||||
|
||||
slit_4 = instrument.create_group("slit_4")
|
||||
slit_4.attrs["NX_class"] = "NXslit"
|
||||
source.create_dataset(name="material", data="Si")
|
||||
source.create_dataset(name="description", data="Slit 4, experimental hutch, exposure box")
|
||||
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl4wh"))
|
||||
x_gap.attrs["units"] = "mm"
|
||||
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl4wv"))
|
||||
y_gap.attrs["units"] = "mm"
|
||||
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl4ch"))
|
||||
x_translation.attrs["units"] = "mm"
|
||||
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl4cv"))
|
||||
height.attrs["units"] = "mm"
|
||||
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0))
|
||||
# distance.attrs["units"] = "mm"
|
||||
# xbpm4 = instrument.create_group("XBPM4")
|
||||
# xbpm4.attrs["NX_class"] = "NXdetector"
|
||||
# xbpm4_sum = xbpm4.create_group("XBPM4_sum")
|
||||
# xbpm4_sum_data = xbpm4_sum.create_dataset(name="data", data=self.get_entry("bpm4s"))
|
||||
# xbpm4_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# xbpm4_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
|
||||
# xbpm4_x = xbpm4.create_group("XBPM4_x")
|
||||
# xbpm4_x_data = xbpm4_x.create_dataset(name="data", data=self.get_entry("bpm4x"))
|
||||
# xbpm4_x_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# xbpm4_x.create_dataset(
|
||||
# name="description",
|
||||
# data="Normalized difference of counts between left and right quadrants.",
|
||||
# )
|
||||
# xbpm4_y = xbpm4.create_group("XBPM4_y")
|
||||
# xbpm4_y_data = xbpm4_y.create_dataset(name="data", data=self.get_entry("bpm4y"))
|
||||
# xbpm4_y_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# xbpm4_y.create_dataset(
|
||||
# name="description",
|
||||
# data="Normalized difference of counts between high and low quadrants.",
|
||||
# )
|
||||
# xbpm4_skew = xbpm4.create_group("XBPM4_skew")
|
||||
# xbpm4_skew_data = xbpm4_skew.create_dataset(name="data", data=self.get_entry("bpm4z"))
|
||||
# xbpm4_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# xbpm4_skew.create_dataset(
|
||||
# name="description", data="Normalized difference of counts between diagonal quadrants."
|
||||
# )
|
||||
|
||||
slit_5 = instrument.create_group("slit_5")
|
||||
slit_5.attrs["NX_class"] = "NXslit"
|
||||
source.create_dataset(name="material", data="Si")
|
||||
source.create_dataset(name="description", data="Slit 5, experimental hutch, exposure box")
|
||||
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl5wh"))
|
||||
x_gap.attrs["units"] = "mm"
|
||||
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl5wv"))
|
||||
y_gap.attrs["units"] = "mm"
|
||||
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl5ch"))
|
||||
x_translation.attrs["units"] = "mm"
|
||||
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl5cv"))
|
||||
height.attrs["units"] = "mm"
|
||||
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0))
|
||||
# distance.attrs["units"] = "mm"
|
||||
# mirror = instrument.create_group("mirror")
|
||||
# mirror.attrs["NX_class"] = "NXmirror"
|
||||
# mirror.create_dataset(name="type", data="single")
|
||||
# mirror.create_dataset(
|
||||
# name="description",
|
||||
# data="Grazing incidence mirror to reject high-harmonic wavelengths from the monochromator. There are three coating options available that are used depending on the X-ray energy, no coating (SiO2), rhodium (Rh) or platinum (Pt).",
|
||||
# )
|
||||
# incident_angle = mirror.create_dataset(name="incident_angle", data=self.get_entry("mith"))
|
||||
# incident_angle.attrs["units"] = "degrees"
|
||||
# substrate_material = mirror.create_dataset(name="substrate_material", data="SiO2")
|
||||
# substrate_material.attrs["units"] = "NX_CHAR"
|
||||
# coating_material = mirror.create_dataset(name="coating_material", data="SiO2")
|
||||
# coating_material.attrs["units"] = "NX_CHAR"
|
||||
# bend_y = mirror.create_dataset(name="bend_y", data="mibd")
|
||||
# bend_y.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# distance = mirror.create_dataset(
|
||||
# name="distance", data=-4370 - np.asarray(self.get_entry("samz", 0))
|
||||
# )
|
||||
# distance.attrs["units"] = "mm"
|
||||
|
||||
beam_stop_1 = instrument.create_group("beam_stop_1")
|
||||
beam_stop_1.attrs["NX_class"] = "NX_beamstop"
|
||||
beam_stop_1.create_dataset(name="description", data="circular")
|
||||
bms1_size = beam_stop_1.create_dataset(name="size", data=3)
|
||||
bms1_size.attrs["units"] = "mm"
|
||||
bms1_x = beam_stop_1.create_dataset(name="size", data=get_entry(data, "bs1x"))
|
||||
bms1_x.attrs["units"] = "mm"
|
||||
bms1_y = beam_stop_1.create_dataset(name="size", data=get_entry(data, "bs1y"))
|
||||
bms1_y.attrs["units"] = "mm"
|
||||
# xbpm5 = instrument.create_group("XBPM5")
|
||||
# xbpm5.attrs["NX_class"] = "NXdetector"
|
||||
# xbpm5_sum = xbpm5.create_group("XBPM5_sum")
|
||||
# xbpm5_sum_data = xbpm5_sum.create_dataset(name="data", data=self.get_entry("bpm5s"))
|
||||
# xbpm5_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# xbpm5_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
|
||||
# xbpm5_x = xbpm5.create_group("XBPM5_x")
|
||||
# xbpm5_x_data = xbpm5_x.create_dataset(name="data", data=self.get_entry("bpm5x"))
|
||||
# xbpm5_x_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# xbpm5_x.create_dataset(
|
||||
# name="description",
|
||||
# data="Normalized difference of counts between left and right quadrants.",
|
||||
# )
|
||||
# xbpm5_y = xbpm5.create_group("XBPM5_y")
|
||||
# xbpm5_y_data = xbpm5_y.create_dataset(name="data", data=self.get_entry("bpm5y"))
|
||||
# xbpm5_y_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# xbpm5_y.create_dataset(
|
||||
# name="description",
|
||||
# data="Normalized difference of counts between high and low quadrants.",
|
||||
# )
|
||||
# xbpm5_skew = xbpm5.create_group("XBPM5_skew")
|
||||
# xbpm5_skew_data = xbpm5_skew.create_dataset(name="data", data=self.get_entry("bpm5z"))
|
||||
# xbpm5_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# xbpm5_skew.create_dataset(
|
||||
# name="description", data="Normalized difference of counts between diagonal quadrants."
|
||||
# )
|
||||
|
||||
beam_stop_2 = instrument.create_group("beam_stop_2")
|
||||
beam_stop_2.attrs["NX_class"] = "NX_beamstop"
|
||||
beam_stop_2.create_dataset(name="description", data="rectangular")
|
||||
bms2_size_x = beam_stop_2.create_dataset(name="size_x", data=5)
|
||||
bms2_size_x.attrs["units"] = "mm"
|
||||
bms2_size_y = beam_stop_2.create_dataset(name="size_y", data=2.25)
|
||||
bms2_size_y.attrs["units"] = "mm"
|
||||
bms2_x = beam_stop_2.create_dataset(name="size", data=get_entry(data, "bs2x"))
|
||||
bms2_x.attrs["units"] = "mm"
|
||||
bms2_y = beam_stop_2.create_dataset(name="size", data=get_entry(data, "bs2y"))
|
||||
bms2_y.attrs["units"] = "mm"
|
||||
bms2_data = beam_stop_2.create_dataset(name="data", data=get_entry(data, "diode"))
|
||||
bms2_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# slit_2 = instrument.create_group("slit_2")
|
||||
# slit_2.attrs["NX_class"] = "NXslit"
|
||||
# source.create_dataset(name="material", data="Ag")
|
||||
# source.create_dataset(name="description", data="Slit 2, optics hutch")
|
||||
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl2wh"))
|
||||
# x_gap.attrs["units"] = "mm"
|
||||
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl2wv"))
|
||||
# y_gap.attrs["units"] = "mm"
|
||||
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl2ch"))
|
||||
# x_translation.attrs["units"] = "mm"
|
||||
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl2cv"))
|
||||
# height.attrs["units"] = "mm"
|
||||
# distance = source.create_dataset(
|
||||
# name="distance", data=-3140 - np.asarray(self.get_entry("samz", 0))
|
||||
# )
|
||||
# distance.attrs["units"] = "mm"
|
||||
|
||||
if "eiger1p5m" in device_manager.devices and device_manager.devices.eiger1p5m.enabled:
|
||||
eiger_4 = instrument.create_group("eiger_4")
|
||||
eiger_4.attrs["NX_class"] = "NXdetector"
|
||||
x_pixel_size = eiger_4.create_dataset(name="x_pixel_size", data=75)
|
||||
x_pixel_size.attrs["units"] = "um"
|
||||
y_pixel_size = eiger_4.create_dataset(name="y_pixel_size", data=75)
|
||||
y_pixel_size.attrs["units"] = "um"
|
||||
polar_angle = eiger_4.create_dataset(name="polar_angle", data=0)
|
||||
polar_angle.attrs["units"] = "degrees"
|
||||
azimuthal_angle = eiger_4.create_dataset(name="azimuthal_angle", data=0)
|
||||
azimuthal_angle.attrs["units"] = "degrees"
|
||||
rotation_angle = eiger_4.create_dataset(name="rotation_angle", data=0)
|
||||
rotation_angle.attrs["units"] = "degrees"
|
||||
description = eiger_4.create_dataset(
|
||||
name="description", data="Single-photon counting detector, 320 micron-thick Si chip"
|
||||
)
|
||||
orientation = eiger_4.create_group("orientation")
|
||||
orientation.attrs["description"] = (
|
||||
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
|
||||
)
|
||||
orientation.create_dataset(name="transpose", data=1)
|
||||
orientation.create_dataset(name="rot90", data=3)
|
||||
# slit_3 = instrument.create_group("slit_3")
|
||||
# slit_3.attrs["NX_class"] = "NXslit"
|
||||
# source.create_dataset(name="material", data="Si")
|
||||
# source.create_dataset(name="description", data="Slit 3, experimental hutch, exposure box")
|
||||
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl3wh"))
|
||||
# x_gap.attrs["units"] = "mm"
|
||||
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl3wv"))
|
||||
# y_gap.attrs["units"] = "mm"
|
||||
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl3ch"))
|
||||
# x_translation.attrs["units"] = "mm"
|
||||
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl3cv"))
|
||||
# height.attrs["units"] = "mm"
|
||||
# # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0))
|
||||
# # distance.attrs["units"] = "mm"
|
||||
|
||||
if (
|
||||
"eiger9m" in device_manager.devices
|
||||
and device_manager.devices.eiger9m.enabled
|
||||
and "eiger9m" in file_references
|
||||
):
|
||||
eiger9m = instrument.create_group("eiger9m")
|
||||
eiger9m.attrs["NX_class"] = "NXdetector"
|
||||
x_pixel_size = eiger9m.create_dataset(name="x_pixel_size", data=75)
|
||||
x_pixel_size.attrs["units"] = "um"
|
||||
y_pixel_size = eiger9m.create_dataset(name="y_pixel_size", data=75)
|
||||
y_pixel_size.attrs["units"] = "um"
|
||||
polar_angle = eiger9m.create_dataset(name="polar_angle", data=0)
|
||||
polar_angle.attrs["units"] = "degrees"
|
||||
azimuthal_angle = eiger9m.create_dataset(name="azimuthal_angle", data=0)
|
||||
azimuthal_angle.attrs["units"] = "degrees"
|
||||
rotation_angle = eiger9m.create_dataset(name="rotation_angle", data=0)
|
||||
rotation_angle.attrs["units"] = "degrees"
|
||||
description = eiger9m.create_dataset(
|
||||
name="description", data="Eiger9M detector, in-house developed, Paul Scherrer Institute"
|
||||
)
|
||||
orientation = eiger9m.create_group("orientation")
|
||||
orientation.attrs["description"] = (
|
||||
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
|
||||
)
|
||||
orientation.create_dataset(name="transpose", data=1)
|
||||
orientation.create_dataset(name="rot90", data=3)
|
||||
data = eiger9m.create_ext_link("data", file_references["eiger9m"]["path"], "EG9M/data")
|
||||
status = eiger9m.create_ext_link(
|
||||
"status", file_references["eiger9m"]["path"], "EG9M/status"
|
||||
)
|
||||
# filter_set = instrument.create_group("filter_set")
|
||||
# filter_set.attrs["NX_class"] = "NXattenuator"
|
||||
# filter_set.create_dataset(name="material", data="Si")
|
||||
# filter_set.create_dataset(
|
||||
# name="description",
|
||||
# data="The filter set consists of 4 linear stages, each with five filter positions. Additionally, each one allows for an out position to allow 'no filtering'.",
|
||||
# )
|
||||
# attenuator_transmission = filter_set.create_dataset(
|
||||
# name="attenuator_transmission", data=10 ** self.get_entry("ftrans", 0)
|
||||
# )
|
||||
# attenuator_transmission.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
|
||||
if (
|
||||
"pilatus_2" in device_manager.devices
|
||||
and device_manager.devices.pilatus_2.enabled
|
||||
and "pilatus_2" in file_references
|
||||
):
|
||||
pilatus_2 = instrument.create_group("pilatus_2")
|
||||
pilatus_2.attrs["NX_class"] = "NXdetector"
|
||||
x_pixel_size = pilatus_2.create_dataset(name="x_pixel_size", data=172)
|
||||
x_pixel_size.attrs["units"] = "um"
|
||||
y_pixel_size = pilatus_2.create_dataset(name="y_pixel_size", data=172)
|
||||
y_pixel_size.attrs["units"] = "um"
|
||||
polar_angle = pilatus_2.create_dataset(name="polar_angle", data=0)
|
||||
polar_angle.attrs["units"] = "degrees"
|
||||
azimuthal_angle = pilatus_2.create_dataset(name="azimuthal_angle", data=0)
|
||||
azimuthal_angle.attrs["units"] = "degrees"
|
||||
rotation_angle = pilatus_2.create_dataset(name="rotation_angle", data=0)
|
||||
rotation_angle.attrs["units"] = "degrees"
|
||||
description = pilatus_2.create_dataset(
|
||||
name="description", data="Pilatus 300K detector, Dectris, Switzerland"
|
||||
)
|
||||
orientation = pilatus_2.create_group("orientation")
|
||||
orientation.attrs["description"] = (
|
||||
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
|
||||
)
|
||||
orientation.create_dataset(name="transpose", data=1)
|
||||
orientation.create_dataset(name="rot90", data=2)
|
||||
data = pilatus_2.create_ext_link(
|
||||
"data", file_references["pilatus_2"]["path"], "entry/instrument/pilatus_2/data"
|
||||
)
|
||||
# slit_4 = instrument.create_group("slit_4")
|
||||
# slit_4.attrs["NX_class"] = "NXslit"
|
||||
# source.create_dataset(name="material", data="Si")
|
||||
# source.create_dataset(name="description", data="Slit 4, experimental hutch, exposure box")
|
||||
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl4wh"))
|
||||
# x_gap.attrs["units"] = "mm"
|
||||
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl4wv"))
|
||||
# y_gap.attrs["units"] = "mm"
|
||||
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl4ch"))
|
||||
# x_translation.attrs["units"] = "mm"
|
||||
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl4cv"))
|
||||
# height.attrs["units"] = "mm"
|
||||
# # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0))
|
||||
# # distance.attrs["units"] = "mm"
|
||||
|
||||
if (
|
||||
"falcon" in device_manager.devices
|
||||
and device_manager.devices.falcon.enabled
|
||||
and "falcon" in file_references
|
||||
):
|
||||
falcon = instrument.create_ext_link(
|
||||
"falcon", file_references["falcon"]["path"], "entry/instrument/FalconX1"
|
||||
)
|
||||
# slit_5 = instrument.create_group("slit_5")
|
||||
# slit_5.attrs["NX_class"] = "NXslit"
|
||||
# source.create_dataset(name="material", data="Si")
|
||||
# source.create_dataset(name="description", data="Slit 5, experimental hutch, exposure box")
|
||||
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl5wh"))
|
||||
# x_gap.attrs["units"] = "mm"
|
||||
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl5wv"))
|
||||
# y_gap.attrs["units"] = "mm"
|
||||
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl5ch"))
|
||||
# x_translation.attrs["units"] = "mm"
|
||||
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl5cv"))
|
||||
# height.attrs["units"] = "mm"
|
||||
# # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0))
|
||||
# # distance.attrs["units"] = "mm"
|
||||
|
||||
return storage
|
||||
# beam_stop_1 = instrument.create_group("beam_stop_1")
|
||||
# beam_stop_1.attrs["NX_class"] = "NX_beamstop"
|
||||
# beam_stop_1.create_dataset(name="description", data="circular")
|
||||
# bms1_size = beam_stop_1.create_dataset(name="size", data=3)
|
||||
# bms1_size.attrs["units"] = "mm"
|
||||
# bms1_x = beam_stop_1.create_dataset(name="size", data=self.get_entry("bs1x"))
|
||||
# bms1_x.attrs["units"] = "mm"
|
||||
# bms1_y = beam_stop_1.create_dataset(name="size", data=self.get_entry("bs1y"))
|
||||
# bms1_y.attrs["units"] = "mm"
|
||||
|
||||
# beam_stop_2 = instrument.create_group("beam_stop_2")
|
||||
# beam_stop_2.attrs["NX_class"] = "NX_beamstop"
|
||||
# beam_stop_2.create_dataset(name="description", data="rectangular")
|
||||
# bms2_size_x = beam_stop_2.create_dataset(name="size_x", data=5)
|
||||
# bms2_size_x.attrs["units"] = "mm"
|
||||
# bms2_size_y = beam_stop_2.create_dataset(name="size_y", data=2.25)
|
||||
# bms2_size_y.attrs["units"] = "mm"
|
||||
# bms2_x = beam_stop_2.create_dataset(name="size", data=self.get_entry("bs2x"))
|
||||
# bms2_x.attrs["units"] = "mm"
|
||||
# bms2_y = beam_stop_2.create_dataset(name="size", data=self.get_entry("bs2y"))
|
||||
# bms2_y.attrs["units"] = "mm"
|
||||
# bms2_data = beam_stop_2.create_dataset(name="data", data=self.get_entry("diode"))
|
||||
# bms2_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
|
||||
# if (
|
||||
# "eiger1p5m" in self.device_manager.devices
|
||||
# and self.device_manager.devices.eiger1p5m.enabled
|
||||
# ):
|
||||
# eiger_4 = instrument.create_group("eiger_4")
|
||||
# eiger_4.attrs["NX_class"] = "NXdetector"
|
||||
# x_pixel_size = eiger_4.create_dataset(name="x_pixel_size", data=75)
|
||||
# x_pixel_size.attrs["units"] = "um"
|
||||
# y_pixel_size = eiger_4.create_dataset(name="y_pixel_size", data=75)
|
||||
# y_pixel_size.attrs["units"] = "um"
|
||||
# polar_angle = eiger_4.create_dataset(name="polar_angle", data=0)
|
||||
# polar_angle.attrs["units"] = "degrees"
|
||||
# azimuthal_angle = eiger_4.create_dataset(name="azimuthal_angle", data=0)
|
||||
# azimuthal_angle.attrs["units"] = "degrees"
|
||||
# rotation_angle = eiger_4.create_dataset(name="rotation_angle", data=0)
|
||||
# rotation_angle.attrs["units"] = "degrees"
|
||||
# description = eiger_4.create_dataset(
|
||||
# name="description", data="Single-photon counting detector, 320 micron-thick Si chip"
|
||||
# )
|
||||
# orientation = eiger_4.create_group("orientation")
|
||||
# orientation.attrs["description"] = (
|
||||
# "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
|
||||
# )
|
||||
# orientation.create_dataset(name="transpose", data=1)
|
||||
# orientation.create_dataset(name="rot90", data=3)
|
||||
|
||||
# if (
|
||||
# "eiger9m" in self.device_manager.devices
|
||||
# and self.device_manager.devices.eiger9m.enabled
|
||||
# and "eiger9m" in self.file_references
|
||||
# ):
|
||||
# eiger9m = instrument.create_group("eiger9m")
|
||||
# eiger9m.attrs["NX_class"] = "NXdetector"
|
||||
# x_pixel_size = eiger9m.create_dataset(name="x_pixel_size", data=75)
|
||||
# x_pixel_size.attrs["units"] = "um"
|
||||
# y_pixel_size = eiger9m.create_dataset(name="y_pixel_size", data=75)
|
||||
# y_pixel_size.attrs["units"] = "um"
|
||||
# polar_angle = eiger9m.create_dataset(name="polar_angle", data=0)
|
||||
# polar_angle.attrs["units"] = "degrees"
|
||||
# azimuthal_angle = eiger9m.create_dataset(name="azimuthal_angle", data=0)
|
||||
# azimuthal_angle.attrs["units"] = "degrees"
|
||||
# rotation_angle = eiger9m.create_dataset(name="rotation_angle", data=0)
|
||||
# rotation_angle.attrs["units"] = "degrees"
|
||||
# description = eiger9m.create_dataset(
|
||||
# name="description",
|
||||
# data="Eiger9M detector, in-house developed, Paul Scherrer Institute",
|
||||
# )
|
||||
# orientation = eiger9m.create_group("orientation")
|
||||
# orientation.attrs["description"] = (
|
||||
# "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
|
||||
# )
|
||||
# orientation.create_dataset(name="transpose", data=1)
|
||||
# orientation.create_dataset(name="rot90", data=3)
|
||||
# data = eiger9m.create_ext_link(
|
||||
# "data", self.file_references["eiger9m"]["path"], "EG9M/data"
|
||||
# )
|
||||
# status = eiger9m.create_ext_link(
|
||||
# "status", self.file_references["eiger9m"]["path"], "EG9M/status"
|
||||
# )
|
||||
|
||||
# if (
|
||||
# "pilatus_2" in self.device_manager.devices
|
||||
# and self.device_manager.devices.pilatus_2.enabled
|
||||
# and "pilatus_2" in self.file_references
|
||||
# ):
|
||||
# pilatus_2 = instrument.create_group("pilatus_2")
|
||||
# pilatus_2.attrs["NX_class"] = "NXdetector"
|
||||
# x_pixel_size = pilatus_2.create_dataset(name="x_pixel_size", data=172)
|
||||
# x_pixel_size.attrs["units"] = "um"
|
||||
# y_pixel_size = pilatus_2.create_dataset(name="y_pixel_size", data=172)
|
||||
# y_pixel_size.attrs["units"] = "um"
|
||||
# polar_angle = pilatus_2.create_dataset(name="polar_angle", data=0)
|
||||
# polar_angle.attrs["units"] = "degrees"
|
||||
# azimuthal_angle = pilatus_2.create_dataset(name="azimuthal_angle", data=0)
|
||||
# azimuthal_angle.attrs["units"] = "degrees"
|
||||
# rotation_angle = pilatus_2.create_dataset(name="rotation_angle", data=0)
|
||||
# rotation_angle.attrs["units"] = "degrees"
|
||||
# description = pilatus_2.create_dataset(
|
||||
# name="description", data="Pilatus 300K detector, Dectris, Switzerland"
|
||||
# )
|
||||
# orientation = pilatus_2.create_group("orientation")
|
||||
# orientation.attrs["description"] = (
|
||||
# "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
|
||||
# )
|
||||
# orientation.create_dataset(name="transpose", data=1)
|
||||
# orientation.create_dataset(name="rot90", data=2)
|
||||
# data = pilatus_2.create_ext_link(
|
||||
# "data", self.file_references["pilatus_2"]["path"], "entry/instrument/pilatus_2/data"
|
||||
# )
|
||||
|
||||
# if (
|
||||
# "falcon" in self.device_manager.devices
|
||||
# and self.device_manager.devices.falcon.enabled
|
||||
# and "falcon" in self.file_references
|
||||
# ):
|
||||
# falcon = instrument.create_ext_link(
|
||||
# "falcon", self.file_references["falcon"]["path"], "entry/instrument/FalconX1"
|
||||
# )
|
||||
|
||||
@@ -1,241 +0,0 @@
|
||||
"""Module to test the pseudo_device module."""
|
||||
|
||||
import pytest
|
||||
from bec_lib.atlas_models import Device
|
||||
from ophyd_devices.sim.sim_signals import SetableSignal
|
||||
|
||||
from csaxs_bec.devices.pseudo_devices.bpm import BPM
|
||||
from csaxs_bec.devices.pseudo_devices.bpm_control import _GAIN_TO_BITS, BPMControl
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patched_dm(dm_with_devices):
|
||||
# Patch missing current_session attribute in the device manager
|
||||
dm = dm_with_devices
|
||||
setattr(dm, "current_session", dm._session)
|
||||
#
|
||||
signal_lsb = SetableSignal(name="gain_lsb", value=0, kind="config")
|
||||
signal_mid = SetableSignal(name="gain_mid", value=0, kind="config")
|
||||
signal_msb = SetableSignal(name="gain_msb", value=0, kind="config")
|
||||
signal_coupling = SetableSignal(name="coupling", value=0, kind="config")
|
||||
signal_speed = SetableSignal(name="speed_mode", value=0, kind="config")
|
||||
for signal in [signal_lsb, signal_mid, signal_msb, signal_coupling, signal_speed]:
|
||||
dev_cfg = Device(
|
||||
name=signal.name,
|
||||
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
|
||||
enabled=True,
|
||||
readoutPriority="baseline",
|
||||
)
|
||||
dm._session["devices"].append(dev_cfg.model_dump())
|
||||
dm.devices._add_device(signal.name, signal)
|
||||
return dm
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bpm_control(patched_dm):
|
||||
name = "bpm_control"
|
||||
control_config = Device(
|
||||
name=name,
|
||||
deviceClass="csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl",
|
||||
enabled=True,
|
||||
readoutPriority="baseline",
|
||||
deviceConfig={
|
||||
"gain_lsb": "gain_lsb",
|
||||
"gain_mid": "gain_mid",
|
||||
"gain_msb": "gain_msb",
|
||||
"coupling": "coupling",
|
||||
"speed_mode": "speed_mode",
|
||||
},
|
||||
needs=["gain_lsb", "gain_mid", "gain_msb", "coupling", "speed_mode"],
|
||||
)
|
||||
patched_dm._session["devices"].append(control_config.model_dump())
|
||||
try:
|
||||
control = BPMControl(
|
||||
name=name,
|
||||
gain_lsb="gain_lsb",
|
||||
gain_mid="gain_mid",
|
||||
gain_msb="gain_msb",
|
||||
coupling="coupling",
|
||||
speed_mode="speed_mode",
|
||||
device_manager=patched_dm,
|
||||
)
|
||||
patched_dm.devices._add_device(control.name, control)
|
||||
control.wait_for_connection()
|
||||
yield control
|
||||
finally:
|
||||
control.destroy()
|
||||
|
||||
|
||||
def test_bpm_control_set_gain(bpm_control):
|
||||
gain_lsb = bpm_control.device_manager.devices["gain_lsb"]
|
||||
gain_mid = bpm_control.device_manager.devices["gain_mid"]
|
||||
gain_msb = bpm_control.device_manager.devices["gain_msb"]
|
||||
coupling = bpm_control.device_manager.devices["coupling"]
|
||||
speed_mode = bpm_control.device_manager.devices["speed_mode"]
|
||||
gain_lsb.put(0)
|
||||
gain_mid.put(0)
|
||||
gain_msb.put(0)
|
||||
coupling.put(0)
|
||||
speed_mode.put(1)
|
||||
|
||||
gain = bpm_control.gain.get()
|
||||
assert _GAIN_TO_BITS.get(gain) == (0, 0, 0, speed_mode.get() == 1)
|
||||
|
||||
gain_val = 10000000
|
||||
bpm_control.set_gain(gain_val)
|
||||
assert _GAIN_TO_BITS.get(gain_val, ()) == (
|
||||
gain_msb.get(),
|
||||
gain_mid.get(),
|
||||
gain_lsb.get(),
|
||||
speed_mode.get(),
|
||||
)
|
||||
|
||||
gain_val = 100000000000
|
||||
bpm_control.set_gain(gain_val)
|
||||
assert _GAIN_TO_BITS.get(gain_val, ()) == (
|
||||
gain_msb.get(),
|
||||
gain_mid.get(),
|
||||
gain_lsb.get(),
|
||||
speed_mode.get(),
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
bpm_control.set_gain(1005.0)
|
||||
|
||||
|
||||
def test_bpm_control_set_coupling(bpm_control):
|
||||
coupling = bpm_control.device_manager.devices["coupling"]
|
||||
coupling.put(0)
|
||||
|
||||
bpm_control.coupling.get() == "AC"
|
||||
coupling.put(1)
|
||||
bpm_control.coupling.get() == "DC"
|
||||
|
||||
bpm_control.set_coupling("AC")
|
||||
assert coupling.get() == 0
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
bpm_control.set_coupling("wrong")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patched_dm_bpm(dm_with_devices):
|
||||
# Patch missing current_session attribute in the device manager
|
||||
dm = dm_with_devices
|
||||
setattr(dm, "current_session", dm._session)
|
||||
#
|
||||
left_top = SetableSignal(name="left_top", value=0, kind="config")
|
||||
right_top = SetableSignal(name="right_top", value=0, kind="config")
|
||||
right_bot = SetableSignal(name="right_bot", value=0, kind="config")
|
||||
left_bot = SetableSignal(name="left_bot", value=0, kind="config")
|
||||
for signal in [left_top, right_top, right_bot, left_bot]:
|
||||
|
||||
dev_cfg = Device(
|
||||
name=signal.name,
|
||||
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
|
||||
enabled=True,
|
||||
readoutPriority="baseline",
|
||||
)
|
||||
dm._session["devices"].append(dev_cfg.model_dump())
|
||||
dm.devices._add_device(signal.name, signal)
|
||||
return dm
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bpm(patched_dm_bpm):
|
||||
name = "bpm"
|
||||
bpm_config = Device(
|
||||
name=name,
|
||||
deviceClass="csaxs_bec.devices.pseudo_devices.bpm.BPM",
|
||||
enabled=True,
|
||||
readoutPriority="baseline",
|
||||
deviceConfig={
|
||||
"left_top": "left_top",
|
||||
"right_top": "right_top",
|
||||
"right_bot": "right_bot",
|
||||
"left_bot": "left_bot",
|
||||
},
|
||||
needs=["left_top", "right_top", "right_bot", "left_bot"],
|
||||
)
|
||||
patched_dm_bpm._session["devices"].append(bpm_config.model_dump())
|
||||
try:
|
||||
bpm = BPM(
|
||||
name=name,
|
||||
left_top="left_top",
|
||||
right_top="right_top",
|
||||
right_bot="right_bot",
|
||||
left_bot="left_bot",
|
||||
device_manager=patched_dm_bpm,
|
||||
)
|
||||
patched_dm_bpm.devices._add_device(bpm.name, bpm)
|
||||
bpm.wait_for_connection()
|
||||
yield bpm
|
||||
finally:
|
||||
bpm.destroy()
|
||||
|
||||
|
||||
def test_bpm_positions(bpm):
|
||||
left_top = bpm.device_manager.devices["left_top"]
|
||||
right_top = bpm.device_manager.devices["right_top"]
|
||||
right_bot = bpm.device_manager.devices["right_bot"]
|
||||
left_bot = bpm.device_manager.devices["left_bot"]
|
||||
|
||||
# Test center position
|
||||
for signal in [left_top, right_top, right_bot, left_bot]:
|
||||
signal.put(1)
|
||||
assert bpm.pos_x.get() == 0
|
||||
assert bpm.pos_y.get() == 0
|
||||
|
||||
# Test fully left
|
||||
left_top.put(1)
|
||||
right_top.put(0)
|
||||
right_bot.put(0)
|
||||
left_bot.put(1)
|
||||
assert bpm.pos_x.get() == -1
|
||||
assert bpm.pos_y.get() == 0
|
||||
assert bpm.diagonal.get() == 0
|
||||
assert bpm.intensity.get() == 2
|
||||
|
||||
# Test fully right
|
||||
left_top.put(0)
|
||||
right_top.put(1)
|
||||
right_bot.put(1)
|
||||
left_bot.put(0)
|
||||
assert bpm.pos_x.get() == 1
|
||||
assert bpm.pos_y.get() == 0
|
||||
assert bpm.diagonal.get() == 0
|
||||
|
||||
# Test fully top
|
||||
left_top.put(1)
|
||||
right_top.put(1)
|
||||
right_bot.put(0)
|
||||
left_bot.put(0)
|
||||
assert bpm.pos_x.get() == 0
|
||||
assert bpm.pos_y.get() == 1
|
||||
assert bpm.diagonal.get() == 0
|
||||
|
||||
# Test fully bottom
|
||||
left_top.put(0)
|
||||
right_top.put(0)
|
||||
right_bot.put(1)
|
||||
left_bot.put(1)
|
||||
assert bpm.pos_x.get() == 0
|
||||
assert bpm.pos_y.get() == -1
|
||||
assert bpm.diagonal.get() == 0
|
||||
|
||||
# Diagonal beam
|
||||
left_top.put(1)
|
||||
right_top.put(0)
|
||||
right_bot.put(1)
|
||||
left_bot.put(0)
|
||||
assert bpm.pos_x.get() == 0
|
||||
assert bpm.pos_y.get() == 0
|
||||
assert bpm.diagonal.get() == -1
|
||||
|
||||
left_top.put(0)
|
||||
right_top.put(1)
|
||||
right_bot.put(0)
|
||||
left_bot.put(1)
|
||||
assert bpm.pos_x.get() == 0
|
||||
assert bpm.pos_y.get() == 0
|
||||
assert bpm.diagonal.get() == 1
|
||||
Reference in New Issue
Block a user