Compare commits

..

1 Commits

Author SHA1 Message Date
9f37d1da70 fix(omny/shutter): MonitorSignal wrapper with auto_monitor
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m56s
CI for csaxs_bec / test (pull_request) Successful in 1m54s
2026-03-26 14:19:09 +01:00
13 changed files with 594 additions and 3022 deletions

View File

@@ -1,89 +0,0 @@
"""
LamNI/webpage_generator.py
===========================
LamNI-specific webpage generator subclass.
Integration (inside the LamNI __init__ / startup):
---------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.LamNI.webpage_generator import (
LamniWebpageGenerator,
)
self._webpage_gen = LamniWebpageGenerator(
bec_client=client,
output_dir="~/data/raw/webpage/",
)
self._webpage_gen.start()
Or use the factory (auto-selects by session name "lamni"):
----------------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
make_webpage_generator,
)
self._webpage_gen = make_webpage_generator(bec, output_dir="~/data/raw/webpage/")
self._webpage_gen.start()
Interactive helpers:
--------------------
lamni._webpage_gen.status()
lamni._webpage_gen.verbosity = 2
lamni._webpage_gen.stop()
lamni._webpage_gen.start()
"""
from pathlib import Path
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
WebpageGeneratorBase,
_safe_get,
_safe_float,
_gvar,
)
class LamniWebpageGenerator(WebpageGeneratorBase):
"""
LamNI-specific webpage generator.
Logo: LamNI.png from the same directory as this file.
Override _collect_setup_data() to add LamNI-specific temperatures,
sample name, and measurement settings.
"""
# TODO: fill in LamNI-specific device paths
# label -> dotpath under device_manager.devices
_TEMP_MAP = {
# "Sample": "lamni_temphum.temperature_sample",
# "OSA": "lamni_temphum.temperature_osa",
}
def _logo_path(self):
return Path(__file__).parent / "LamNI.png"
def _collect_setup_data(self) -> dict:
# ── LamNI-specific data goes here ─────────────────────────────
# Uncomment and adapt when device names are known:
#
# dm = self._bec.device_manager
# sample_name = _safe_get(dm, "lamni_samples.sample_names.sample0") or "N/A"
# temperatures = {
# label: _safe_float(_safe_get(dm, path))
# for label, path in self._TEMP_MAP.items()
# }
# settings = {
# "Sample name": sample_name,
# "FOV x / y": ...,
# "Exposure time": _gvar(self._bec, "tomo_countingtime", ".3f", " s"),
# "Angle step": _gvar(self._bec, "tomo_angle_stepsize", ".2f", "\u00b0"),
# }
# return {
# "type": "lamni",
# "sample_name": sample_name,
# "temperatures": temperatures,
# "settings": settings,
# }
# Placeholder — returns minimal info until implemented
return {
"type": "lamni",
# LamNI-specific data here
}

View File

@@ -21,14 +21,6 @@ from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import (
TomoIDManager,
)
# from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
# FlomniWebpageGenerator,
# VERBOSITY_SILENT, # 0 — no output
# VERBOSITY_NORMAL, # 1 — startup / stop messages only (default)
# VERBOSITY_VERBOSE, # 2 — one-line summary per cycle
# VERBOSITY_DEBUG, # 3 — full JSON payload per cycle
# )
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
@@ -1311,16 +1303,6 @@ class Flomni(
self.corr_angle_y_2 = []
self._progress_proxy = _ProgressProxy(self.client)
self._progress_proxy.reset()
from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_webpage_generator import (
FlomniWebpageGenerator,
)
self._webpage_gen = FlomniWebpageGenerator(
bec_client=client,
output_dir="~/data/raw/webpage/",
upload_url="http://s1090968537.online.de/upload.php", # optional
)
self._webpage_gen.start()
self.OMNYTools = OMNYTools(self.client)
self.reconstructor = PtychoReconstructor(self.ptycho_reconstruct_foldername)
self.tomo_id_manager = TomoIDManager()
@@ -2267,8 +2249,8 @@ class Flomni(
+ ' 888 888 "Y88888P" 888 888 888 Y888 8888888 \n'
)
padding = 20
fovxy = f"{self.fovx:.1f}/{self.fovy:.1f}"
stitching = f"{self.stitch_x:.0f}/{self.stitch_y:.0f}"
fovxy = f"{self.fovx:.2f}/{self.fovy:.2f}"
stitching = f"{self.stitch_x:.2f}/{self.stitch_y:.2f}"
dataset_id = str(self.client.queue.next_dataset_number)
account = bec.active_account
content = [
@@ -2285,7 +2267,7 @@ class Flomni(
f"{'Exposure time:':<{padding}}{self.tomo_countingtime:>{padding}.2f}\n",
f"{'Fermat spiral step size:':<{padding}}{self.tomo_shellstep:>{padding}.2f}\n",
f"{'FOV:':<{padding}}{fovxy:>{padding}}\n",
f"{'Stitching:':<{padding}}{stitching:>{padding}}\n",
f"{'Stitching:':<{padding}}{stitching:>{padding}.0f}\n",
f"{'Number of individual sub-tomograms:':<{padding}}{8:>{padding}}\n",
f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n",
]

View File

@@ -1,96 +0,0 @@
"""
omny/webpage_generator.py
==========================
OMNY-specific webpage generator subclass.
Integration (inside the OMNY __init__ / startup):
--------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.omny.webpage_generator import (
OmnyWebpageGenerator,
)
self._webpage_gen = OmnyWebpageGenerator(
bec_client=client,
output_dir="~/data/raw/webpage/",
)
self._webpage_gen.start()
Or use the factory (auto-selects by session name "omny"):
---------------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
make_webpage_generator,
)
self._webpage_gen = make_webpage_generator(bec, output_dir="~/data/raw/webpage/")
self._webpage_gen.start()
Interactive helpers:
--------------------
omny._webpage_gen.status()
omny._webpage_gen.verbosity = 2
omny._webpage_gen.stop()
omny._webpage_gen.start()
"""
from pathlib import Path
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
WebpageGeneratorBase,
_safe_get,
_safe_float,
_gvar,
)
class OmnyWebpageGenerator(WebpageGeneratorBase):
"""
OMNY-specific webpage generator.
Logo: OMNY.png from the same directory as this file.
Override _collect_setup_data() to add OMNY-specific temperatures,
sample name, and measurement settings.
The old OMNY spec webpage showed:
- Cryo temperatures (XOMNY-TEMP-CRYO-A/B)
- Per-channel temperatures (XOMNY-TEMP1..48)
- Dewar pressure / LN2 flow
- Interferometer strengths (OINTERF)
Map these to BEC device paths below once available.
"""
# TODO: fill in OMNY-specific device paths
# label -> dotpath under device_manager.devices
_TEMP_MAP = {
# "Sample (cryo A)": "omny_temp.cryo_a",
# "Cryo head (B)": "omny_temp.cryo_b",
}
def _logo_path(self):
return Path(__file__).parent / "OMNY.png"
def _collect_setup_data(self) -> dict:
# ── OMNY-specific data goes here ──────────────────────────────
# Uncomment and adapt when device names are known:
#
# dm = self._bec.device_manager
# sample_name = _safe_get(dm, "omny_samples.sample_names.sample0") or "N/A"
# temperatures = {
# label: _safe_float(_safe_get(dm, path))
# for label, path in self._TEMP_MAP.items()
# }
# settings = {
# "Sample name": sample_name,
# "FOV x / y": ...,
# "Exposure time": _gvar(self._bec, "tomo_countingtime", ".3f", " s"),
# "Angle step": _gvar(self._bec, "tomo_angle_stepsize", ".2f", "\u00b0"),
# }
# return {
# "type": "omny",
# "sample_name": sample_name,
# "temperatures": temperatures,
# "settings": settings,
# }
# Placeholder — returns minimal info until implemented
return {
"type": "omny",
# OMNY-specific data here
}

View File

@@ -72,7 +72,7 @@ xbpm3x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -95,7 +95,7 @@ xbpm3y:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -118,7 +118,7 @@ sl3trxi:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -141,7 +141,7 @@ sl3trxo:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -164,7 +164,7 @@ sl3trxb:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -187,7 +187,7 @@ sl3trxt:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -210,7 +210,7 @@ fast_shutter_n1_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -234,7 +234,7 @@ fast_shutter_o1_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -257,7 +257,7 @@ fast_shutter_o2_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -280,7 +280,7 @@ filter_array_1_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -303,7 +303,7 @@ filter_array_2_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -326,7 +326,7 @@ filter_array_3_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -349,7 +349,7 @@ filter_array_4_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -372,7 +372,7 @@ sl4trxi:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -395,7 +395,7 @@ sl4trxo:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -418,7 +418,7 @@ sl4trxb:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -441,7 +441,7 @@ sl4trxt:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -466,7 +466,7 @@ sl5trxi:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -489,7 +489,7 @@ sl5trxo:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -512,7 +512,7 @@ sl5trxb:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -535,7 +535,7 @@ sl5trxt:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -558,7 +558,7 @@ xbimtrx:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -581,7 +581,7 @@ xbimtry:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -589,237 +589,3 @@ xbimtry:
init_position: 0
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 1
################### XBOX related ###################
# we assue the epics settings for resolution, velocity etc. are correct
# we do not overwrite from here
aptrx:
description: Aperture pinhole X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-PIN1:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
aptry:
description: Aperture pinhole Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-PIN1:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
ebtrx:
description: Exposure box aperture X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-EB:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
ebtry:
description: Exposure box aperture Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-EB:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
ebtrz:
description: Exposure box aperture Z
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-EB:TRZ1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
ebsupport:
description: Exposure box granite support Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-EH1-EB:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttrx1:
description: FTS1 translation X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS1:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttry1:
description: FTS1 translation Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS1:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttrx2:
description: FTS2 translation X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS2:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttry2:
description: FTS2 translation Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS2:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttrz:
description: FTS1 translation Z
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS1:TRZ1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
bs1x:
description: Beamstop 1 X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-BS1:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
bs1y:
description: Beamstop 1 Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-BS1:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
bs2x:
description: Beamstop 2 X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-BS2:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
bs2y:
description: Beamstop 2 Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-BS2:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
dttrx:
description: Detector table X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-DETT:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
dttry:
description: Detector table Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-DETT:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
dttrz:
description: Detector table Z
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-DETT:TRZ1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
dettrx:
description: Detector 1 X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-DET1:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false

View File

@@ -68,106 +68,91 @@ ccmx:
- cSAXS
- optics
# ccm_energy:
# readoutPriority: baseline
# deviceClass: ophyd_devices.devices.simple_positioner.PSIPositionerBase
# prefix: "X12SA-OP-CCM1:"
# override_suffixes:
# user_readback: "ENERGY-GET"
# user_setpoint: "ENERGY-SET"
# velocity: "ROTY:VELO"
# deviceTags:
# - user motors
# enabled: true
# readOnly: false
##########################################################################
######################## SMARACT STAGES ##################################
##########################################################################
# xbpm2x:
# description: X-ray beam position monitor 1 in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: A
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 0
xbpm2x:
description: X-ray beam position monitor 1 in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: A
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 0
# xbpm2y:
# description: X-ray beam position monitor 1 in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: B
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 1
# cu_foilx:
# description: Cu foil in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: C
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 2
# scinx:
# description: scintillator in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: D
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 3
xbpm2y:
description: X-ray beam position monitor 1 in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: B
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 1
cu_foilx:
description: Cu foil in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: C
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 2
scinx:
description: scintillator in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: D
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 3
# dmm1_trx_readback_example: # This is the same template as for i.e. bpm4i

View File

@@ -10,8 +10,8 @@
endstation:
- !include ./bl_endstation.yaml
# detectors:
# - !include ./bl_detectors.yaml
detectors:
- !include ./bl_detectors.yaml
#sastt:
# - !include ./sastt.yaml

View File

@@ -395,7 +395,7 @@ rtz:
readoutPriority: on_request
connectionTimeout: 20
rt_positions:
rt_flyer:
deviceClass: csaxs_bec.devices.omny.rt.rt_flomni_ophyd.RtFlomniFlyer
deviceConfig:
host: mpc2844.psi.ch
@@ -522,18 +522,6 @@ omny_panda:
FMC_IN.VAL2.Min: cap_voltage_fzp_x_min
FMC_IN.VAL2.Max: cap_voltage_fzp_x_max
FMC_IN.VAL2.Mean: cap_voltage_fzp_x_mean
INENC1.VAL.Max: interf_st_fzp_y_max
INENC1.VAL.Mean: interf_st_fzp_y_mean
INENC1.VAL.Min: interf_st_fzp_y_min
INENC2.VAL.Max: interf_st_fzp_x_max
INENC2.VAL.Mean: interf_st_fzp_x_mean
INENC2.VAL.Min: interf_st_fzp_x_min
INENC3.VAL.Max: interf_st_rotz_max
INENC3.VAL.Mean: interf_st_rotz_mean
INENC3.VAL.Min: interf_st_rotz_min
INENC4.VAL.Max: interf_st_rotx_max
INENC4.VAL.Mean: interf_st_rotx_mean
INENC4.VAL.Min: interf_st_rotx_min
deviceTags:
- detector
enabled: true

View File

@@ -13,6 +13,14 @@ from ophyd_devices import PSIDeviceBase
logger = bec_logger.logger
class MonitorSignal(Signal):
"""A simple wrapper around ophyd Signal that automatically monitors the signal for changes."""
def __init__(self, *, name, auto_monitor=False, **kwargs):
super().__init__(name=name, **kwargs)
self.auto_monitor = auto_monitor
class OMNYFastShutter(PSIDeviceBase, Device):
"""
Fast Shutter control for OMNY setup. If started with at the beamline, it will expose
@@ -26,7 +34,7 @@ class OMNYFastShutter(PSIDeviceBase, Device):
SUB_VALUE = "value"
_default_sub = SUB_VALUE
shutter = Cpt(Signal, name="shutter")
shutter = Cpt(MonitorSignal, name="shutter", auto_monitor=True)
# -----------------------------------------------------
# User-facing shutter control functions

View File

@@ -1 +1 @@
from .csaxs_nexus import cSAXSNeXusFormat
from .csaxs_nexus import NeXus_format as cSAXS_NeXus_format

View File

@@ -1,472 +1,445 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any
import numpy as np
from bec_server.file_writer.default_writer import DefaultFormat
if TYPE_CHECKING:
from bec_lib.devicemanager import DeviceManagerBase
from bec_server.file_writer.file_writer import HDF5Storage
class cSAXSNeXusFormat(DefaultFormat):
def get_entry(data: dict, name: str, default=None) -> Any:
"""
NeXus file format for cSAXS beamline. This format is based on the default NeXus format, but with some additional entries specific to the cSAXS beamline. The structure of the file is based on the NeXus standard, but with some additional groups and datasets specific to the cSAXS beamline.
Get an entry from the scan data assuming a <device>.<device>.value structure.
Args:
data (dict): Scan data
name (str): Entry name
default (Any, optional): Default value. Defaults to None.
"""
if isinstance(data.get(name), list) and isinstance(data.get(name)[0], dict):
return [sub_data.get(name, {}).get("value", default) for sub_data in data.get(name)]
def format(self) -> None:
"""
Prepare the NeXus file format.
Override this method in file writer plugins to customize the HDF5 file format.
return data.get(name, {}).get(name, {}).get("value", default)
The class provides access to the following attributes:
- self.storage: The HDF5Storage object.
- self.data: The data dictionary.
- self.file_references: The file references dictionary, which has the link to external data.
- self.device_manager: The DeviceManagerBase object.
- self.get_entry(name, default=None): Helper method to get an entry from the data dictionary.
See also: :class:`bec_server.file_writer.file_writer.HDF5Storage`.
def NeXus_format(
storage: HDF5Storage, data: dict, file_references: dict, device_manager: DeviceManagerBase
) -> HDF5Storage:
"""
Prepare the NeXus file format.
"""
Args:
storage (HDF5Storage): HDF5 storage. Pseudo hdf5 file container that will be written to disk later.
data (dict): scan data
file_references (dict): File references. Can be used to add external files to the HDF5 file. The path is given relative to the HDF5 file.
device_manager (DeviceManagerBase): Device manager. Can be used to check if devices are available.
# entry = self.storage.create_group("entry")
Returns:
HDF5Storage: Updated HDF5 storage
"""
# /entry
entry = storage.create_group("entry")
entry.attrs["NX_class"] = "NXentry"
entry.attrs["definition"] = "NXsas"
entry.attrs["start_time"] = data.get("start_time")
entry.attrs["end_time"] = data.get("end_time")
entry.attrs["version"] = 1.0
# # /entry/control
# control = entry.create_group("control")
# control.attrs["NX_class"] = "NXmonitor"
# control.create_dataset(name="mode", data="monitor")
# /entry/collection
collection = entry.create_group("collection")
collection.attrs["NX_class"] = "NXcollection"
bec_collection = collection.create_group("bec")
# #########
# # EXAMPLE for soft link
# #########
# # /entry/data
# if "eiger_4" in self.device_manager.devices:
# entry.create_soft_link(name="data", target="/entry/instrument/eiger_4")
# /entry/control
control = entry.create_group("control")
control.attrs["NX_class"] = "NXmonitor"
control.create_dataset(name="mode", data="monitor")
control.create_dataset(name="integral", data=get_entry(data, "bpm4i"))
# ########
# # EXAMPLE for external link
# ########
# # control = entry.create_group("sample")
# # control.create_ext_link("data", self.file_references["eiger9m"]["path"], "EG9M/data")
# /entry/data
main_data = entry.create_group("data")
main_data.attrs["NX_class"] = "NXdata"
if "eiger_4" in device_manager.devices:
main_data.create_soft_link(name="data", target="/entry/instrument/eiger_4/data")
elif "eiger9m" in device_manager.devices:
main_data.create_soft_link(name="data", target="/entry/instrument/eiger9m/data")
elif "pilatus_2" in device_manager.devices:
main_data.create_soft_link(name="data", target="/entry/instrument/pilatus_2/data")
# # /entry/sample
# control = entry.create_group("sample")
# control.attrs["NX_class"] = "NXsample"
# control.create_dataset(name="name", data=self.data.get("samplename"))
# control.create_dataset(name="description", data=self.data.get("sample_description"))
# /entry/sample
control = entry.create_group("sample")
control.attrs["NX_class"] = "NXsample"
control.create_dataset(name="name", data=get_entry(data, "samplename"))
control.create_dataset(name="description", data=data.get("sample_description"))
x_translation = control.create_dataset(name="x_translation", data=get_entry(data, "samx"))
x_translation.attrs["units"] = "mm"
y_translation = control.create_dataset(name="y_translation", data=get_entry(data, "samy"))
y_translation.attrs["units"] = "mm"
temperature_log = control.create_dataset(name="temperature_log", data=get_entry(data, "temp"))
temperature_log.attrs["units"] = "K"
# # /entry/instrument
# instrument = entry.create_group("instrument")
# instrument.attrs["NX_class"] = "NXinstrument"
# /entry/instrument
instrument = entry.create_group("instrument")
instrument.attrs["NX_class"] = "NXinstrument"
instrument.create_dataset(name="name", data="cSAXS beamline")
# source = instrument.create_group("source")
# source.attrs["NX_class"] = "NXsource"
# source.create_dataset(name="type", data="Synchrotron X-ray Source")
# source.create_dataset(name="name", data="Swiss Light Source")
# source.create_dataset(name="probe", data="x-ray")
source = instrument.create_group("source")
source.attrs["NX_class"] = "NXsource"
source.create_dataset(name="type", data="Synchrotron X-ray Source")
source.create_dataset(name="name", data="Swiss Light Source")
source.create_dataset(name="probe", data="x-ray")
distance = source.create_dataset(
name="distance", data=-33800 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
sigma_x = source.create_dataset(name="sigma_x", data=0.202)
sigma_x.attrs["units"] = "mm"
sigma_y = source.create_dataset(name="sigma_y", data=0.018)
sigma_y.attrs["units"] = "mm"
divergence_x = source.create_dataset(name="divergence_x", data=0.000135)
divergence_x.attrs["units"] = "radians"
divergence_y = source.create_dataset(name="divergence_y", data=0.000025)
divergence_y.attrs["units"] = "radians"
current = source.create_dataset(name="current", data=get_entry(data, "curr"))
current.attrs["units"] = "mA"
# # /entry
# entry = self.storage.create_group("entry")
# entry.attrs["NX_class"] = "NXentry"
# entry.attrs["definition"] = "NXsas"
# entry.attrs["start_time"] = self.data.get("start_time")
# entry.attrs["end_time"] = self.data.get("end_time")
# entry.attrs["version"] = 1.0
insertion_device = instrument.create_group("insertion_device")
insertion_device.attrs["NX_class"] = "NXinsertion_device"
source.create_dataset(name="type", data="undulator")
gap = source.create_dataset(name="gap", data=get_entry(data, "idgap"))
gap.attrs["units"] = "mm"
k = source.create_dataset(name="k", data=2.46)
k.attrs["units"] = "NX_DIMENSIONLESS"
length = source.create_dataset(name="length", data=1820)
length.attrs["units"] = "mm"
# # /entry/control
# control = entry.create_group("control")
# control.attrs["NX_class"] = "NXmonitor"
# control.create_dataset(name="mode", data="monitor")
# control.create_dataset(name="integral", data=self.get_entry("bpm4i"))
slit_0 = instrument.create_group("slit_0")
slit_0.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="OFHC Cu")
source.create_dataset(name="description", data="Horizontal secondary source slit")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl0wh"))
x_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl0ch"))
x_translation.attrs["units"] = "mm"
distance = source.create_dataset(
name="distance", data=-21700 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
# # /entry/data
# main_data = entry.create_group("data")
# main_data.attrs["NX_class"] = "NXdata"
# if "eiger_4" in self.device_manager.devices:
# main_data.create_soft_link(name="data", target="/entry/instrument/eiger_4/data")
# elif "eiger9m" in self.device_manager.devices:
# main_data.create_soft_link(name="data", target="/entry/instrument/eiger9m/data")
# elif "pilatus_2" in self.device_manager.devices:
# main_data.create_soft_link(name="data", target="/entry/instrument/pilatus_2/data")
slit_1 = instrument.create_group("slit_1")
slit_1.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="OFHC Cu")
source.create_dataset(name="description", data="Horizontal secondary source slit")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl1wh"))
x_gap.attrs["units"] = "mm"
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl1wv"))
y_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl1ch"))
x_translation.attrs["units"] = "mm"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl1ch"))
height.attrs["units"] = "mm"
distance = source.create_dataset(
name="distance", data=-7800 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
# # /entry/sample
# control = entry.create_group("sample")
# control.attrs["NX_class"] = "NXsample"
# control.create_dataset(name="name", data=self.get_entry("samplename"))
# control.create_dataset(name="description", data=self.data.get("sample_description"))
# x_translation = control.create_dataset(name="x_translation", data=self.get_entry("samx"))
# x_translation.attrs["units"] = "mm"
# y_translation = control.create_dataset(name="y_translation", data=self.get_entry("samy"))
# y_translation.attrs["units"] = "mm"
# temperature_log = control.create_dataset(
# name="temperature_log", data=self.get_entry("temp")
# )
# temperature_log.attrs["units"] = "K"
mono = instrument.create_group("monochromator")
mono.attrs["NX_class"] = "NXmonochromator"
mokev = data.get("mokev", {})
if mokev:
if isinstance(mokev, list):
mokev = mokev[0]
wavelength = mono.create_dataset(
name="wavelength", data=12.3984193 / (mokev.get("mokev").get("value") + 1e-9)
)
wavelength.attrs["units"] = "Angstrom"
energy = mono.create_dataset(name="energy", data=mokev.get("mokev").get("value"))
energy.attrs["units"] = "keV"
mono.create_dataset(name="type", data="Double crystal fixed exit monochromator.")
distance = mono.create_dataset(
name="distance", data=-5220 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
# # /entry/instrument
# instrument = entry.create_group("instrument")
# instrument.attrs["NX_class"] = "NXinstrument"
# instrument.create_dataset(name="name", data="cSAXS beamline")
crystal_1 = mono.create_group("crystal_1")
crystal_1.attrs["NX_class"] = "NXcrystal"
crystal_1.create_dataset(name="usage", data="Bragg")
crystal_1.create_dataset(name="order_no", data="1")
crystal_1.create_dataset(name="reflection", data="[1 1 1]")
bragg_angle = crystal_1.create_dataset(name="bragg_angle", data=get_entry(data, "moth1"))
bragg_angle.attrs["units"] = "degrees"
# source = instrument.create_group("source")
# source.attrs["NX_class"] = "NXsource"
# source.create_dataset(name="type", data="Synchrotron X-ray Source")
# source.create_dataset(name="name", data="Swiss Light Source")
# source.create_dataset(name="probe", data="x-ray")
# distance = source.create_dataset(
# name="distance", data=-33800 - np.asarray(self.get_entry("samz", 0))
# )
# distance.attrs["units"] = "mm"
# sigma_x = source.create_dataset(name="sigma_x", data=0.202)
# sigma_x.attrs["units"] = "mm"
# sigma_y = source.create_dataset(name="sigma_y", data=0.018)
# sigma_y.attrs["units"] = "mm"
# divergence_x = source.create_dataset(name="divergence_x", data=0.000135)
# divergence_x.attrs["units"] = "radians"
# divergence_y = source.create_dataset(name="divergence_y", data=0.000025)
# divergence_y.attrs["units"] = "radians"
# current = source.create_dataset(name="current", data=self.get_entry("curr"))
# current.attrs["units"] = "mA"
crystal_2 = mono.create_group("crystal_2")
crystal_2.attrs["NX_class"] = "NXcrystal"
crystal_2.create_dataset(name="usage", data="Bragg")
crystal_2.create_dataset(name="order_no", data="2")
crystal_2.create_dataset(name="reflection", data="[1 1 1]")
bragg_angle = crystal_2.create_dataset(name="bragg_angle", data=get_entry(data, "moth1"))
bragg_angle.attrs["units"] = "degrees"
bend_x = crystal_2.create_dataset(name="bend_x", data=get_entry(data, "mobd"))
bend_x.attrs["units"] = "degrees"
# insertion_device = instrument.create_group("insertion_device")
# insertion_device.attrs["NX_class"] = "NXinsertion_device"
# source.create_dataset(name="type", data="undulator")
# gap = source.create_dataset(name="gap", data=self.get_entry("idgap"))
# gap.attrs["units"] = "mm"
# k = source.create_dataset(name="k", data=2.46)
# k.attrs["units"] = "NX_DIMENSIONLESS"
# length = source.create_dataset(name="length", data=1820)
# length.attrs["units"] = "mm"
xbpm4 = instrument.create_group("XBPM4")
xbpm4.attrs["NX_class"] = "NXdetector"
xbpm4_sum = xbpm4.create_group("XBPM4_sum")
xbpm4_sum_data = xbpm4_sum.create_dataset(name="data", data=get_entry(data, "bpm4s"))
xbpm4_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm4_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
xbpm4_x = xbpm4.create_group("XBPM4_x")
xbpm4_x_data = xbpm4_x.create_dataset(name="data", data=get_entry(data, "bpm4x"))
xbpm4_x_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm4_x.create_dataset(
name="description", data="Normalized difference of counts between left and right quadrants."
)
xbpm4_y = xbpm4.create_group("XBPM4_y")
xbpm4_y_data = xbpm4_y.create_dataset(name="data", data=get_entry(data, "bpm4y"))
xbpm4_y_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm4_y.create_dataset(
name="description", data="Normalized difference of counts between high and low quadrants."
)
xbpm4_skew = xbpm4.create_group("XBPM4_skew")
xbpm4_skew_data = xbpm4_skew.create_dataset(name="data", data=get_entry(data, "bpm4z"))
xbpm4_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm4_skew.create_dataset(
name="description", data="Normalized difference of counts between diagonal quadrants."
)
# slit_0 = instrument.create_group("slit_0")
# slit_0.attrs["NX_class"] = "NXslit"
# source.create_dataset(name="material", data="OFHC Cu")
# source.create_dataset(name="description", data="Horizontal secondary source slit")
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl0wh"))
# x_gap.attrs["units"] = "mm"
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl0ch"))
# x_translation.attrs["units"] = "mm"
# distance = source.create_dataset(
# name="distance", data=-21700 - np.asarray(self.get_entry("samz", 0))
# )
# distance.attrs["units"] = "mm"
mirror = instrument.create_group("mirror")
mirror.attrs["NX_class"] = "NXmirror"
mirror.create_dataset(name="type", data="single")
mirror.create_dataset(
name="description",
data="Grazing incidence mirror to reject high-harmonic wavelengths from the monochromator. There are three coating options available that are used depending on the X-ray energy, no coating (SiO2), rhodium (Rh) or platinum (Pt).",
)
incident_angle = mirror.create_dataset(name="incident_angle", data=get_entry(data, "mith"))
incident_angle.attrs["units"] = "degrees"
substrate_material = mirror.create_dataset(name="substrate_material", data="SiO2")
substrate_material.attrs["units"] = "NX_CHAR"
coating_material = mirror.create_dataset(name="coating_material", data="SiO2")
coating_material.attrs["units"] = "NX_CHAR"
bend_y = mirror.create_dataset(name="bend_y", data="mibd")
bend_y.attrs["units"] = "NX_DIMENSIONLESS"
distance = mirror.create_dataset(
name="distance", data=-4370 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
# slit_1 = instrument.create_group("slit_1")
# slit_1.attrs["NX_class"] = "NXslit"
# source.create_dataset(name="material", data="OFHC Cu")
# source.create_dataset(name="description", data="Horizontal secondary source slit")
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl1wh"))
# x_gap.attrs["units"] = "mm"
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl1wv"))
# y_gap.attrs["units"] = "mm"
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl1ch"))
# x_translation.attrs["units"] = "mm"
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl1ch"))
# height.attrs["units"] = "mm"
# distance = source.create_dataset(
# name="distance", data=-7800 - np.asarray(self.get_entry("samz", 0))
# )
# distance.attrs["units"] = "mm"
xbpm5 = instrument.create_group("XBPM5")
xbpm5.attrs["NX_class"] = "NXdetector"
xbpm5_sum = xbpm5.create_group("XBPM5_sum")
xbpm5_sum_data = xbpm5_sum.create_dataset(name="data", data=get_entry(data, "bpm5s"))
xbpm5_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm5_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
xbpm5_x = xbpm5.create_group("XBPM5_x")
xbpm5_x_data = xbpm5_x.create_dataset(name="data", data=get_entry(data, "bpm5x"))
xbpm5_x_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm5_x.create_dataset(
name="description", data="Normalized difference of counts between left and right quadrants."
)
xbpm5_y = xbpm5.create_group("XBPM5_y")
xbpm5_y_data = xbpm5_y.create_dataset(name="data", data=get_entry(data, "bpm5y"))
xbpm5_y_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm5_y.create_dataset(
name="description", data="Normalized difference of counts between high and low quadrants."
)
xbpm5_skew = xbpm5.create_group("XBPM5_skew")
xbpm5_skew_data = xbpm5_skew.create_dataset(name="data", data=get_entry(data, "bpm5z"))
xbpm5_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm5_skew.create_dataset(
name="description", data="Normalized difference of counts between diagonal quadrants."
)
# mono = instrument.create_group("monochromator")
# mono.attrs["NX_class"] = "NXmonochromator"
# mokev = self.data.get("mokev", {})
# if mokev:
# if isinstance(mokev, list):
# mokev = mokev[0]
# wavelength = mono.create_dataset(
# name="wavelength", data=12.3984193 / (mokev.get("mokev").get("value") + 1e-9)
# )
# wavelength.attrs["units"] = "Angstrom"
# energy = mono.create_dataset(name="energy", data=mokev.get("mokev").get("value"))
# energy.attrs["units"] = "keV"
# mono.create_dataset(name="type", data="Double crystal fixed exit monochromator.")
# distance = mono.create_dataset(
# name="distance", data=-5220 - np.asarray(self.get_entry("samz", 0))
# )
# distance.attrs["units"] = "mm"
slit_2 = instrument.create_group("slit_2")
slit_2.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="Ag")
source.create_dataset(name="description", data="Slit 2, optics hutch")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl2wh"))
x_gap.attrs["units"] = "mm"
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl2wv"))
y_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl2ch"))
x_translation.attrs["units"] = "mm"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl2cv"))
height.attrs["units"] = "mm"
distance = source.create_dataset(
name="distance", data=-3140 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
# crystal_1 = mono.create_group("crystal_1")
# crystal_1.attrs["NX_class"] = "NXcrystal"
# crystal_1.create_dataset(name="usage", data="Bragg")
# crystal_1.create_dataset(name="order_no", data="1")
# crystal_1.create_dataset(name="reflection", data="[1 1 1]")
# bragg_angle = crystal_1.create_dataset(name="bragg_angle", data=self.get_entry("moth1"))
# bragg_angle.attrs["units"] = "degrees"
slit_3 = instrument.create_group("slit_3")
slit_3.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="Si")
source.create_dataset(name="description", data="Slit 3, experimental hutch, exposure box")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl3wh"))
x_gap.attrs["units"] = "mm"
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl3wv"))
y_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl3ch"))
x_translation.attrs["units"] = "mm"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl3cv"))
height.attrs["units"] = "mm"
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0))
# distance.attrs["units"] = "mm"
# crystal_2 = mono.create_group("crystal_2")
# crystal_2.attrs["NX_class"] = "NXcrystal"
# crystal_2.create_dataset(name="usage", data="Bragg")
# crystal_2.create_dataset(name="order_no", data="2")
# crystal_2.create_dataset(name="reflection", data="[1 1 1]")
# bragg_angle = crystal_2.create_dataset(name="bragg_angle", data=self.get_entry("moth1"))
# bragg_angle.attrs["units"] = "degrees"
# bend_x = crystal_2.create_dataset(name="bend_x", data=self.get_entry("mobd"))
# bend_x.attrs["units"] = "degrees"
filter_set = instrument.create_group("filter_set")
filter_set.attrs["NX_class"] = "NXattenuator"
filter_set.create_dataset(name="material", data="Si")
filter_set.create_dataset(
name="description",
data="The filter set consists of 4 linear stages, each with five filter positions. Additionally, each one allows for an out position to allow 'no filtering'.",
)
attenuator_transmission = filter_set.create_dataset(
name="attenuator_transmission", data=10 ** get_entry(data, "ftrans", 0)
)
attenuator_transmission.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm4 = instrument.create_group("XBPM4")
# xbpm4.attrs["NX_class"] = "NXdetector"
# xbpm4_sum = xbpm4.create_group("XBPM4_sum")
# xbpm4_sum_data = xbpm4_sum.create_dataset(name="data", data=self.get_entry("bpm4s"))
# xbpm4_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm4_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
# xbpm4_x = xbpm4.create_group("XBPM4_x")
# xbpm4_x_data = xbpm4_x.create_dataset(name="data", data=self.get_entry("bpm4x"))
# xbpm4_x_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm4_x.create_dataset(
# name="description",
# data="Normalized difference of counts between left and right quadrants.",
# )
# xbpm4_y = xbpm4.create_group("XBPM4_y")
# xbpm4_y_data = xbpm4_y.create_dataset(name="data", data=self.get_entry("bpm4y"))
# xbpm4_y_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm4_y.create_dataset(
# name="description",
# data="Normalized difference of counts between high and low quadrants.",
# )
# xbpm4_skew = xbpm4.create_group("XBPM4_skew")
# xbpm4_skew_data = xbpm4_skew.create_dataset(name="data", data=self.get_entry("bpm4z"))
# xbpm4_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm4_skew.create_dataset(
# name="description", data="Normalized difference of counts between diagonal quadrants."
# )
slit_4 = instrument.create_group("slit_4")
slit_4.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="Si")
source.create_dataset(name="description", data="Slit 4, experimental hutch, exposure box")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl4wh"))
x_gap.attrs["units"] = "mm"
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl4wv"))
y_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl4ch"))
x_translation.attrs["units"] = "mm"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl4cv"))
height.attrs["units"] = "mm"
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0))
# distance.attrs["units"] = "mm"
# mirror = instrument.create_group("mirror")
# mirror.attrs["NX_class"] = "NXmirror"
# mirror.create_dataset(name="type", data="single")
# mirror.create_dataset(
# name="description",
# data="Grazing incidence mirror to reject high-harmonic wavelengths from the monochromator. There are three coating options available that are used depending on the X-ray energy, no coating (SiO2), rhodium (Rh) or platinum (Pt).",
# )
# incident_angle = mirror.create_dataset(name="incident_angle", data=self.get_entry("mith"))
# incident_angle.attrs["units"] = "degrees"
# substrate_material = mirror.create_dataset(name="substrate_material", data="SiO2")
# substrate_material.attrs["units"] = "NX_CHAR"
# coating_material = mirror.create_dataset(name="coating_material", data="SiO2")
# coating_material.attrs["units"] = "NX_CHAR"
# bend_y = mirror.create_dataset(name="bend_y", data="mibd")
# bend_y.attrs["units"] = "NX_DIMENSIONLESS"
# distance = mirror.create_dataset(
# name="distance", data=-4370 - np.asarray(self.get_entry("samz", 0))
# )
# distance.attrs["units"] = "mm"
slit_5 = instrument.create_group("slit_5")
slit_5.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="Si")
source.create_dataset(name="description", data="Slit 5, experimental hutch, exposure box")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl5wh"))
x_gap.attrs["units"] = "mm"
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl5wv"))
y_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl5ch"))
x_translation.attrs["units"] = "mm"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl5cv"))
height.attrs["units"] = "mm"
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0))
# distance.attrs["units"] = "mm"
# xbpm5 = instrument.create_group("XBPM5")
# xbpm5.attrs["NX_class"] = "NXdetector"
# xbpm5_sum = xbpm5.create_group("XBPM5_sum")
# xbpm5_sum_data = xbpm5_sum.create_dataset(name="data", data=self.get_entry("bpm5s"))
# xbpm5_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm5_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
# xbpm5_x = xbpm5.create_group("XBPM5_x")
# xbpm5_x_data = xbpm5_x.create_dataset(name="data", data=self.get_entry("bpm5x"))
# xbpm5_x_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm5_x.create_dataset(
# name="description",
# data="Normalized difference of counts between left and right quadrants.",
# )
# xbpm5_y = xbpm5.create_group("XBPM5_y")
# xbpm5_y_data = xbpm5_y.create_dataset(name="data", data=self.get_entry("bpm5y"))
# xbpm5_y_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm5_y.create_dataset(
# name="description",
# data="Normalized difference of counts between high and low quadrants.",
# )
# xbpm5_skew = xbpm5.create_group("XBPM5_skew")
# xbpm5_skew_data = xbpm5_skew.create_dataset(name="data", data=self.get_entry("bpm5z"))
# xbpm5_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm5_skew.create_dataset(
# name="description", data="Normalized difference of counts between diagonal quadrants."
# )
beam_stop_1 = instrument.create_group("beam_stop_1")
beam_stop_1.attrs["NX_class"] = "NX_beamstop"
beam_stop_1.create_dataset(name="description", data="circular")
bms1_size = beam_stop_1.create_dataset(name="size", data=3)
bms1_size.attrs["units"] = "mm"
bms1_x = beam_stop_1.create_dataset(name="size", data=get_entry(data, "bs1x"))
bms1_x.attrs["units"] = "mm"
bms1_y = beam_stop_1.create_dataset(name="size", data=get_entry(data, "bs1y"))
bms1_y.attrs["units"] = "mm"
# slit_2 = instrument.create_group("slit_2")
# slit_2.attrs["NX_class"] = "NXslit"
# source.create_dataset(name="material", data="Ag")
# source.create_dataset(name="description", data="Slit 2, optics hutch")
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl2wh"))
# x_gap.attrs["units"] = "mm"
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl2wv"))
# y_gap.attrs["units"] = "mm"
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl2ch"))
# x_translation.attrs["units"] = "mm"
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl2cv"))
# height.attrs["units"] = "mm"
# distance = source.create_dataset(
# name="distance", data=-3140 - np.asarray(self.get_entry("samz", 0))
# )
# distance.attrs["units"] = "mm"
beam_stop_2 = instrument.create_group("beam_stop_2")
beam_stop_2.attrs["NX_class"] = "NX_beamstop"
beam_stop_2.create_dataset(name="description", data="rectangular")
bms2_size_x = beam_stop_2.create_dataset(name="size_x", data=5)
bms2_size_x.attrs["units"] = "mm"
bms2_size_y = beam_stop_2.create_dataset(name="size_y", data=2.25)
bms2_size_y.attrs["units"] = "mm"
bms2_x = beam_stop_2.create_dataset(name="size", data=get_entry(data, "bs2x"))
bms2_x.attrs["units"] = "mm"
bms2_y = beam_stop_2.create_dataset(name="size", data=get_entry(data, "bs2y"))
bms2_y.attrs["units"] = "mm"
bms2_data = beam_stop_2.create_dataset(name="data", data=get_entry(data, "diode"))
bms2_data.attrs["units"] = "NX_DIMENSIONLESS"
# slit_3 = instrument.create_group("slit_3")
# slit_3.attrs["NX_class"] = "NXslit"
# source.create_dataset(name="material", data="Si")
# source.create_dataset(name="description", data="Slit 3, experimental hutch, exposure box")
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl3wh"))
# x_gap.attrs["units"] = "mm"
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl3wv"))
# y_gap.attrs["units"] = "mm"
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl3ch"))
# x_translation.attrs["units"] = "mm"
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl3cv"))
# height.attrs["units"] = "mm"
# # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0))
# # distance.attrs["units"] = "mm"
if "eiger1p5m" in device_manager.devices and device_manager.devices.eiger1p5m.enabled:
eiger_4 = instrument.create_group("eiger_4")
eiger_4.attrs["NX_class"] = "NXdetector"
x_pixel_size = eiger_4.create_dataset(name="x_pixel_size", data=75)
x_pixel_size.attrs["units"] = "um"
y_pixel_size = eiger_4.create_dataset(name="y_pixel_size", data=75)
y_pixel_size.attrs["units"] = "um"
polar_angle = eiger_4.create_dataset(name="polar_angle", data=0)
polar_angle.attrs["units"] = "degrees"
azimuthal_angle = eiger_4.create_dataset(name="azimuthal_angle", data=0)
azimuthal_angle.attrs["units"] = "degrees"
rotation_angle = eiger_4.create_dataset(name="rotation_angle", data=0)
rotation_angle.attrs["units"] = "degrees"
description = eiger_4.create_dataset(
name="description", data="Single-photon counting detector, 320 micron-thick Si chip"
)
orientation = eiger_4.create_group("orientation")
orientation.attrs["description"] = (
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
)
orientation.create_dataset(name="transpose", data=1)
orientation.create_dataset(name="rot90", data=3)
# filter_set = instrument.create_group("filter_set")
# filter_set.attrs["NX_class"] = "NXattenuator"
# filter_set.create_dataset(name="material", data="Si")
# filter_set.create_dataset(
# name="description",
# data="The filter set consists of 4 linear stages, each with five filter positions. Additionally, each one allows for an out position to allow 'no filtering'.",
# )
# attenuator_transmission = filter_set.create_dataset(
# name="attenuator_transmission", data=10 ** self.get_entry("ftrans", 0)
# )
# attenuator_transmission.attrs["units"] = "NX_DIMENSIONLESS"
if (
"eiger9m" in device_manager.devices
and device_manager.devices.eiger9m.enabled
and "eiger9m" in file_references
):
eiger9m = instrument.create_group("eiger9m")
eiger9m.attrs["NX_class"] = "NXdetector"
x_pixel_size = eiger9m.create_dataset(name="x_pixel_size", data=75)
x_pixel_size.attrs["units"] = "um"
y_pixel_size = eiger9m.create_dataset(name="y_pixel_size", data=75)
y_pixel_size.attrs["units"] = "um"
polar_angle = eiger9m.create_dataset(name="polar_angle", data=0)
polar_angle.attrs["units"] = "degrees"
azimuthal_angle = eiger9m.create_dataset(name="azimuthal_angle", data=0)
azimuthal_angle.attrs["units"] = "degrees"
rotation_angle = eiger9m.create_dataset(name="rotation_angle", data=0)
rotation_angle.attrs["units"] = "degrees"
description = eiger9m.create_dataset(
name="description", data="Eiger9M detector, in-house developed, Paul Scherrer Institute"
)
orientation = eiger9m.create_group("orientation")
orientation.attrs["description"] = (
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
)
orientation.create_dataset(name="transpose", data=1)
orientation.create_dataset(name="rot90", data=3)
data = eiger9m.create_ext_link("data", file_references["eiger9m"]["path"], "EG9M/data")
status = eiger9m.create_ext_link(
"status", file_references["eiger9m"]["path"], "EG9M/status"
)
# slit_4 = instrument.create_group("slit_4")
# slit_4.attrs["NX_class"] = "NXslit"
# source.create_dataset(name="material", data="Si")
# source.create_dataset(name="description", data="Slit 4, experimental hutch, exposure box")
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl4wh"))
# x_gap.attrs["units"] = "mm"
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl4wv"))
# y_gap.attrs["units"] = "mm"
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl4ch"))
# x_translation.attrs["units"] = "mm"
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl4cv"))
# height.attrs["units"] = "mm"
# # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0))
# # distance.attrs["units"] = "mm"
if (
"pilatus_2" in device_manager.devices
and device_manager.devices.pilatus_2.enabled
and "pilatus_2" in file_references
):
pilatus_2 = instrument.create_group("pilatus_2")
pilatus_2.attrs["NX_class"] = "NXdetector"
x_pixel_size = pilatus_2.create_dataset(name="x_pixel_size", data=172)
x_pixel_size.attrs["units"] = "um"
y_pixel_size = pilatus_2.create_dataset(name="y_pixel_size", data=172)
y_pixel_size.attrs["units"] = "um"
polar_angle = pilatus_2.create_dataset(name="polar_angle", data=0)
polar_angle.attrs["units"] = "degrees"
azimuthal_angle = pilatus_2.create_dataset(name="azimuthal_angle", data=0)
azimuthal_angle.attrs["units"] = "degrees"
rotation_angle = pilatus_2.create_dataset(name="rotation_angle", data=0)
rotation_angle.attrs["units"] = "degrees"
description = pilatus_2.create_dataset(
name="description", data="Pilatus 300K detector, Dectris, Switzerland"
)
orientation = pilatus_2.create_group("orientation")
orientation.attrs["description"] = (
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
)
orientation.create_dataset(name="transpose", data=1)
orientation.create_dataset(name="rot90", data=2)
data = pilatus_2.create_ext_link(
"data", file_references["pilatus_2"]["path"], "entry/instrument/pilatus_2/data"
)
# slit_5 = instrument.create_group("slit_5")
# slit_5.attrs["NX_class"] = "NXslit"
# source.create_dataset(name="material", data="Si")
# source.create_dataset(name="description", data="Slit 5, experimental hutch, exposure box")
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl5wh"))
# x_gap.attrs["units"] = "mm"
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl5wv"))
# y_gap.attrs["units"] = "mm"
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl5ch"))
# x_translation.attrs["units"] = "mm"
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl5cv"))
# height.attrs["units"] = "mm"
# # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0))
# # distance.attrs["units"] = "mm"
if (
"falcon" in device_manager.devices
and device_manager.devices.falcon.enabled
and "falcon" in file_references
):
falcon = instrument.create_ext_link(
"falcon", file_references["falcon"]["path"], "entry/instrument/FalconX1"
)
# beam_stop_1 = instrument.create_group("beam_stop_1")
# beam_stop_1.attrs["NX_class"] = "NX_beamstop"
# beam_stop_1.create_dataset(name="description", data="circular")
# bms1_size = beam_stop_1.create_dataset(name="size", data=3)
# bms1_size.attrs["units"] = "mm"
# bms1_x = beam_stop_1.create_dataset(name="size", data=self.get_entry("bs1x"))
# bms1_x.attrs["units"] = "mm"
# bms1_y = beam_stop_1.create_dataset(name="size", data=self.get_entry("bs1y"))
# bms1_y.attrs["units"] = "mm"
# beam_stop_2 = instrument.create_group("beam_stop_2")
# beam_stop_2.attrs["NX_class"] = "NX_beamstop"
# beam_stop_2.create_dataset(name="description", data="rectangular")
# bms2_size_x = beam_stop_2.create_dataset(name="size_x", data=5)
# bms2_size_x.attrs["units"] = "mm"
# bms2_size_y = beam_stop_2.create_dataset(name="size_y", data=2.25)
# bms2_size_y.attrs["units"] = "mm"
# bms2_x = beam_stop_2.create_dataset(name="size", data=self.get_entry("bs2x"))
# bms2_x.attrs["units"] = "mm"
# bms2_y = beam_stop_2.create_dataset(name="size", data=self.get_entry("bs2y"))
# bms2_y.attrs["units"] = "mm"
# bms2_data = beam_stop_2.create_dataset(name="data", data=self.get_entry("diode"))
# bms2_data.attrs["units"] = "NX_DIMENSIONLESS"
# if (
# "eiger1p5m" in self.device_manager.devices
# and self.device_manager.devices.eiger1p5m.enabled
# ):
# eiger_4 = instrument.create_group("eiger_4")
# eiger_4.attrs["NX_class"] = "NXdetector"
# x_pixel_size = eiger_4.create_dataset(name="x_pixel_size", data=75)
# x_pixel_size.attrs["units"] = "um"
# y_pixel_size = eiger_4.create_dataset(name="y_pixel_size", data=75)
# y_pixel_size.attrs["units"] = "um"
# polar_angle = eiger_4.create_dataset(name="polar_angle", data=0)
# polar_angle.attrs["units"] = "degrees"
# azimuthal_angle = eiger_4.create_dataset(name="azimuthal_angle", data=0)
# azimuthal_angle.attrs["units"] = "degrees"
# rotation_angle = eiger_4.create_dataset(name="rotation_angle", data=0)
# rotation_angle.attrs["units"] = "degrees"
# description = eiger_4.create_dataset(
# name="description", data="Single-photon counting detector, 320 micron-thick Si chip"
# )
# orientation = eiger_4.create_group("orientation")
# orientation.attrs["description"] = (
# "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
# )
# orientation.create_dataset(name="transpose", data=1)
# orientation.create_dataset(name="rot90", data=3)
# if (
# "eiger9m" in self.device_manager.devices
# and self.device_manager.devices.eiger9m.enabled
# and "eiger9m" in self.file_references
# ):
# eiger9m = instrument.create_group("eiger9m")
# eiger9m.attrs["NX_class"] = "NXdetector"
# x_pixel_size = eiger9m.create_dataset(name="x_pixel_size", data=75)
# x_pixel_size.attrs["units"] = "um"
# y_pixel_size = eiger9m.create_dataset(name="y_pixel_size", data=75)
# y_pixel_size.attrs["units"] = "um"
# polar_angle = eiger9m.create_dataset(name="polar_angle", data=0)
# polar_angle.attrs["units"] = "degrees"
# azimuthal_angle = eiger9m.create_dataset(name="azimuthal_angle", data=0)
# azimuthal_angle.attrs["units"] = "degrees"
# rotation_angle = eiger9m.create_dataset(name="rotation_angle", data=0)
# rotation_angle.attrs["units"] = "degrees"
# description = eiger9m.create_dataset(
# name="description",
# data="Eiger9M detector, in-house developed, Paul Scherrer Institute",
# )
# orientation = eiger9m.create_group("orientation")
# orientation.attrs["description"] = (
# "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
# )
# orientation.create_dataset(name="transpose", data=1)
# orientation.create_dataset(name="rot90", data=3)
# data = eiger9m.create_ext_link(
# "data", self.file_references["eiger9m"]["path"], "EG9M/data"
# )
# status = eiger9m.create_ext_link(
# "status", self.file_references["eiger9m"]["path"], "EG9M/status"
# )
# if (
# "pilatus_2" in self.device_manager.devices
# and self.device_manager.devices.pilatus_2.enabled
# and "pilatus_2" in self.file_references
# ):
# pilatus_2 = instrument.create_group("pilatus_2")
# pilatus_2.attrs["NX_class"] = "NXdetector"
# x_pixel_size = pilatus_2.create_dataset(name="x_pixel_size", data=172)
# x_pixel_size.attrs["units"] = "um"
# y_pixel_size = pilatus_2.create_dataset(name="y_pixel_size", data=172)
# y_pixel_size.attrs["units"] = "um"
# polar_angle = pilatus_2.create_dataset(name="polar_angle", data=0)
# polar_angle.attrs["units"] = "degrees"
# azimuthal_angle = pilatus_2.create_dataset(name="azimuthal_angle", data=0)
# azimuthal_angle.attrs["units"] = "degrees"
# rotation_angle = pilatus_2.create_dataset(name="rotation_angle", data=0)
# rotation_angle.attrs["units"] = "degrees"
# description = pilatus_2.create_dataset(
# name="description", data="Pilatus 300K detector, Dectris, Switzerland"
# )
# orientation = pilatus_2.create_group("orientation")
# orientation.attrs["description"] = (
# "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
# )
# orientation.create_dataset(name="transpose", data=1)
# orientation.create_dataset(name="rot90", data=2)
# data = pilatus_2.create_ext_link(
# "data", self.file_references["pilatus_2"]["path"], "entry/instrument/pilatus_2/data"
# )
# if (
# "falcon" in self.device_manager.devices
# and self.device_manager.devices.falcon.enabled
# and "falcon" in self.file_references
# ):
# falcon = instrument.create_ext_link(
# "falcon", self.file_references["falcon"]["path"], "entry/instrument/FalconX1"
# )
return storage

View File

@@ -105,11 +105,11 @@ class FlomniFermatScan(AsyncFlyScanBase):
def scan_report_instructions(self):
"""Scan report instructions for the progress bar"""
yield from self.stubs.scan_report_instruction({"device_progress": ["rt_positions"]})
yield from self.stubs.scan_report_instruction({"device_progress": ["rt_flyer"]})
@property
def monitor_sync(self) -> str:
return "rt_positions"
return "rt_flyer"
def initialize(self):
self.scan_motors = []
@@ -294,10 +294,10 @@ class FlomniFermatScan(AsyncFlyScanBase):
def scan_core(self):
# send off the flyer
yield from self.stubs.kickoff(device="rt_positions")
yield from self.stubs.kickoff(device="rt_flyer")
# start the readout loop of the flyer
status = yield from self.stubs.complete(device="rt_positions", wait=False)
status = yield from self.stubs.complete(device="rt_flyer", wait=False)
# read the monitors until the flyer is done
while not status.done:

View File

@@ -0,0 +1,69 @@
import pytest
from bec_server.device_server.tests.utils import DMMock
from csaxs_bec.devices.omny.shutter import MonitorSignal, OMNYFastShutter
@pytest.mark.parametrize("auto_monitor", [False, True])
def test_monitor_signal_stores_auto_monitor(auto_monitor):
signal = MonitorSignal(name="signal", auto_monitor=auto_monitor)
assert signal.auto_monitor is auto_monitor
def test_monitor_signal_put_propagates_value_to_readback_callback():
signal = MonitorSignal(name="signal", auto_monitor=True)
initial_value = signal.read()[signal.name]["value"]
callback_values = []
callback_reads = []
def _test_cb(value, old_value, **kwargs):
callback_values.append((value, old_value))
callback_reads.append(kwargs["obj"].read())
signal.subscribe(_test_cb, event_type=signal.SUB_VALUE, run=False)
signal.put(1)
assert callback_values == [(1, initial_value)]
assert len(callback_reads) == 1
assert callback_reads[0][signal.name]["value"] == 1
assert signal.read()[signal.name]["value"] == 1
signal.put(0)
assert callback_values == [(1, initial_value), (0, 1)]
assert len(callback_reads) == 2
assert callback_reads[1][signal.name]["value"] == 0
assert signal.read()[signal.name]["value"] == 0
@pytest.fixture
def omny_fast_shutter():
shutter = OMNYFastShutter(name="omny_fast_shutter", device_manager=DMMock())
try:
yield shutter
finally:
shutter.destroy()
def test_omny_fast_shutter_uses_monitor_signal_with_auto_monitor(omny_fast_shutter):
assert isinstance(omny_fast_shutter.shutter, MonitorSignal)
assert omny_fast_shutter.shutter.auto_monitor is True
def test_omny_fast_shutter_propagates_signal_changes_to_device_readback(omny_fast_shutter):
signal_name = omny_fast_shutter.shutter.name
callback_reads = []
def _test_cb(**kwargs):
callback_reads.append(omny_fast_shutter.read())
omny_fast_shutter.shutter.subscribe(_test_cb, event_type=omny_fast_shutter.shutter.SUB_VALUE, run=False)
omny_fast_shutter.shutter.put(1)
assert len(callback_reads) == 1
assert callback_reads[0][signal_name]["value"] == 1
assert omny_fast_shutter.read()[signal_name]["value"] == 1
assert omny_fast_shutter.fshstatus() == 1