Compare commits

..

31 Commits

Author SHA1 Message Date
x12sa
ac23be094e tomo id on private server added
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m56s
2026-04-01 13:15:10 +02:00
x12sa
10cc820b2c url for tomo id
All checks were successful
CI for csaxs_bec / test (push) Successful in 2m0s
2026-04-01 11:47:53 +02:00
x12sa
acc3fb0104 webpage quet http server, signal 1to1 device, first galil configs
All checks were successful
CI for csaxs_bec / test (push) Successful in 2m39s
2026-03-31 17:50:03 +02:00
6a704c6dd0 test: add tests for bpm and bpm_control
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 2m0s
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 1s
CI for csaxs_bec / test (push) Successful in 1m58s
2026-03-31 14:25:16 +02:00
2e014bd9ea fix(pseudo_devices): fix pseudo devices, bpm and bpm_control 2026-03-31 14:25:16 +02:00
x12sa
006a451220 feat: Add BPM and BPMControl pseudo devices 2026-03-31 14:25:07 +02:00
x12sa
bdc996d3b2 some adjustments in structure
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m57s
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 1s
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-31 12:56:15 +02:00
x12sa
2fac8bc1d7 minor config updates
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m56s
2026-03-31 11:12:55 +02:00
x12sa
bf045dadf1 first version from claude
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-30 17:13:45 +02:00
x12sa
be508cf300 first version with working entries 2026-03-30 16:48:47 +02:00
x12sa
f786e34a0e fix thread stop
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m56s
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 1s
CI for csaxs_bec / test (push) Successful in 1m58s
2026-03-30 14:41:44 +02:00
x12sa
cceedc947a local server added
All checks were successful
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 1s
CI for csaxs_bec / test (push) Successful in 2m0s
2026-03-30 13:07:46 +02:00
x12sa
80de9724d4 removed linear progress bar 2026-03-30 13:07:46 +02:00
x12sa
2ac02e0623 http error message cooldown to 10 mins 2026-03-30 13:07:46 +02:00
x12sa
3c2a0aa484 error message cooldown 2026-03-30 13:07:46 +02:00
x12sa
27f4eca4ae change url and fix in https certificate ignore 2026-03-30 13:07:46 +02:00
x12sa
f2771bd4b6 https without certificate possible 2026-03-30 13:07:46 +02:00
x12sa
546ebf8a58 mod audio gen 2026-03-30 13:07:46 +02:00
d3f1d31bb8 fix(file_writer): Fix file_writer format method.
All checks were successful
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 2s
CI for csaxs_bec / test (push) Successful in 2m25s
2026-03-30 12:34:07 +02:00
6d404cad12 fix(omny/shutter): MonitorSignal wrapper with auto_monitor
All checks were successful
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 1s
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-27 22:41:57 +01:00
x12sa
b67e1c012c renamed rt flyer to rt positions
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m54s
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 1s
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-27 16:05:35 +01:00
x12sa
cbbec12d9b option to upload to a php interface
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-27 15:44:49 +01:00
x12sa
8f4a9f025e some adjustments
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m56s
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 2s
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-27 13:43:28 +01:00
x12sa
1b9b983ab2 wip optics config for energy device
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m56s
2026-03-27 12:48:51 +01:00
x12sa
d7b442969a added motors to endstation config 2026-03-27 12:48:51 +01:00
x12sa
f92db3f169 movable cards and more
All checks were successful
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 3s
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-26 16:38:51 +01:00
x12sa
55531c8a65 next version 2026-03-26 16:38:51 +01:00
x12sa
1d408818cc next iteration, seems a first good and usable ver. 2026-03-26 16:38:51 +01:00
x12sa
ae2045dd10 fixes in contrast and audio confirm logif 2026-03-26 16:38:51 +01:00
x12sa
fd4d455a5b webpage version2 2026-03-26 16:38:51 +01:00
x12sa
3411aaaeb4 first version of webpage 2026-03-26 16:38:51 +01:00
23 changed files with 4037 additions and 545 deletions

View File

@@ -0,0 +1,89 @@
"""
LamNI/webpage_generator.py
===========================
LamNI-specific webpage generator subclass.
Integration (inside the LamNI __init__ / startup):
---------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.LamNI.webpage_generator import (
LamniWebpageGenerator,
)
self._webpage_gen = LamniWebpageGenerator(
bec_client=client,
output_dir="~/data/raw/webpage/",
)
self._webpage_gen.start()
Or use the factory (auto-selects by session name "lamni"):
----------------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
make_webpage_generator,
)
self._webpage_gen = make_webpage_generator(bec, output_dir="~/data/raw/webpage/")
self._webpage_gen.start()
Interactive helpers:
--------------------
lamni._webpage_gen.status()
lamni._webpage_gen.verbosity = 2
lamni._webpage_gen.stop()
lamni._webpage_gen.start()
"""
from pathlib import Path
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
WebpageGeneratorBase,
_safe_get,
_safe_float,
_gvar,
)
class LamniWebpageGenerator(WebpageGeneratorBase):
"""
LamNI-specific webpage generator.
Logo: LamNI.png from the same directory as this file.
Override _collect_setup_data() to add LamNI-specific temperatures,
sample name, and measurement settings.
"""
# TODO: fill in LamNI-specific device paths
# label -> dotpath under device_manager.devices
_TEMP_MAP = {
# "Sample": "lamni_temphum.temperature_sample",
# "OSA": "lamni_temphum.temperature_osa",
}
def _logo_path(self):
return Path(__file__).parent / "LamNI.png"
def _collect_setup_data(self) -> dict:
# ── LamNI-specific data goes here ─────────────────────────────
# Uncomment and adapt when device names are known:
#
# dm = self._bec.device_manager
# sample_name = _safe_get(dm, "lamni_samples.sample_names.sample0") or "N/A"
# temperatures = {
# label: _safe_float(_safe_get(dm, path))
# for label, path in self._TEMP_MAP.items()
# }
# settings = {
# "Sample name": sample_name,
# "FOV x / y": ...,
# "Exposure time": _gvar(self._bec, "tomo_countingtime", ".3f", " s"),
# "Angle step": _gvar(self._bec, "tomo_angle_stepsize", ".2f", "\u00b0"),
# }
# return {
# "type": "lamni",
# "sample_name": sample_name,
# "temperatures": temperatures,
# "settings": settings,
# }
# Placeholder — returns minimal info until implemented
return {
"type": "lamni",
# LamNI-specific data here
}

View File

@@ -70,7 +70,7 @@ DLPCA200_AMPLIFIER_CONFIG: dict[str, dict] = {
"rio_device": "galilrioesxbox",
"description": "Beam Position Monitor 4 current amplifier",
"channels": {
"gain_lsb": 0, # Pin 10 -> Galil ch0
"gain_lsb": rio_optics.analog_in.ch0, # Pin 10 -> Galil ch0
"gain_mid": 1, # Pin 11 -> Galil ch1
"gain_msb": 2, # Pin 12 -> Galil ch2
"coupling": 3, # Pin 13 -> Galil ch3

View File

@@ -21,6 +21,14 @@ from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import (
TomoIDManager,
)
# from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
# FlomniWebpageGenerator,
# VERBOSITY_SILENT, # 0 — no output
# VERBOSITY_NORMAL, # 1 — startup / stop messages only (default)
# VERBOSITY_VERBOSE, # 2 — one-line summary per cycle
# VERBOSITY_DEBUG, # 3 — full JSON payload per cycle
# )
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
@@ -1303,6 +1311,18 @@ class Flomni(
self.corr_angle_y_2 = []
self._progress_proxy = _ProgressProxy(self.client)
self._progress_proxy.reset()
from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_webpage_generator import (
FlomniWebpageGenerator,
)
self._webpage_gen = FlomniWebpageGenerator(
bec_client=client,
output_dir="~/data/raw/webpage/",
#upload_url="http://s1090968537.online.de/upload.php", # optional
upload_url="https://v1p0zyg2w9n2k9c1.myfritz.net/upload.php",
local_port=8080
)
self._webpage_gen.start()
self.OMNYTools = OMNYTools(self.client)
self.reconstructor = PtychoReconstructor(self.ptycho_reconstruct_foldername)
self.tomo_id_manager = TomoIDManager()
@@ -1809,20 +1829,20 @@ class Flomni(
or (self.tomo_type == 3 and projection_number == None)
):
# pylint: disable=undefined-variable
# if bec.active_account != "":
# self.tomo_id = self.add_sample_database(
# self.sample_name,
# str(datetime.date.today()),
# bec.active_account,
# bec.queue.next_scan_number,
# "flomni",
# "test additional info",
# "BEC",
# )
# self.write_pdf_report()
# else:
self.tomo_id = 0
#pylint: disable=undefined-variable
if bec.active_account != "":
self.tomo_id = self.add_sample_database(
self.sample_name,
str(datetime.date.today()),
bec.active_account,
bec.queue.next_scan_number,
"flomni",
"test additional info",
"BEC",
)
self.write_pdf_report()
else:
self.tomo_id = 0
self.write_pdf_report()
self.progress["tomo_start_time"] = datetime.datetime.now().isoformat()
@@ -2249,8 +2269,8 @@ class Flomni(
+ ' 888 888 "Y88888P" 888 888 888 Y888 8888888 \n'
)
padding = 20
fovxy = f"{self.fovx:.2f}/{self.fovy:.2f}"
stitching = f"{self.stitch_x:.2f}/{self.stitch_y:.2f}"
fovxy = f"{self.fovx:.1f}/{self.fovy:.1f}"
stitching = f"{self.stitch_x:.0f}/{self.stitch_y:.0f}"
dataset_id = str(self.client.queue.next_dataset_number)
account = bec.active_account
content = [
@@ -2267,7 +2287,7 @@ class Flomni(
f"{'Exposure time:':<{padding}}{self.tomo_countingtime:>{padding}.2f}\n",
f"{'Fermat spiral step size:':<{padding}}{self.tomo_shellstep:>{padding}.2f}\n",
f"{'FOV:':<{padding}}{fovxy:>{padding}}\n",
f"{'Stitching:':<{padding}}{stitching:>{padding}.0f}\n",
f"{'Stitching:':<{padding}}{stitching:>{padding}}\n",
f"{'Number of individual sub-tomograms:':<{padding}}{8:>{padding}}\n",
f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n",
]

File diff suppressed because it is too large Load Diff

View File

@@ -234,9 +234,10 @@ class TomoIDManager:
)
"""
OMNY_URL = "https://omny.web.psi.ch/samples/newmeasurement.php"
OMNY_USER = "omny"
OMNY_PASSWORD = "samples"
#OMNY_URL = "https://omny.web.psi.ch/samples/newmeasurement.php"
OMNY_URL = "https://v1p0zyg2w9n2k9c1.myfritz.net/samples/newmeasurement.php"
OMNY_USER = ""
OMNY_PASSWORD = ""
TMP_FILE = "/tmp/currsamplesnr.txt"
def register(
@@ -273,9 +274,14 @@ class TomoIDManager:
f"&additional={additional_info}"
f"&user={user}"
)
# subprocess.run(
# f"wget --user={self.OMNY_USER} --password={self.OMNY_PASSWORD}"
# f" -q -O {self.TMP_FILE} '{url}'",
# shell=True,
# )
#print(url)
subprocess.run(
f"wget --user={self.OMNY_USER} --password={self.OMNY_PASSWORD}"
f" -q -O {self.TMP_FILE} '{url}'",
f"wget -q -O {self.TMP_FILE} '{url}'",
shell=True,
)
with open(self.TMP_FILE) as f:

View File

@@ -0,0 +1,96 @@
"""
omny/webpage_generator.py
==========================
OMNY-specific webpage generator subclass.
Integration (inside the OMNY __init__ / startup):
--------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.omny.webpage_generator import (
OmnyWebpageGenerator,
)
self._webpage_gen = OmnyWebpageGenerator(
bec_client=client,
output_dir="~/data/raw/webpage/",
)
self._webpage_gen.start()
Or use the factory (auto-selects by session name "omny"):
---------------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
make_webpage_generator,
)
self._webpage_gen = make_webpage_generator(bec, output_dir="~/data/raw/webpage/")
self._webpage_gen.start()
Interactive helpers:
--------------------
omny._webpage_gen.status()
omny._webpage_gen.verbosity = 2
omny._webpage_gen.stop()
omny._webpage_gen.start()
"""
from pathlib import Path
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
WebpageGeneratorBase,
_safe_get,
_safe_float,
_gvar,
)
class OmnyWebpageGenerator(WebpageGeneratorBase):
"""
OMNY-specific webpage generator.
Logo: OMNY.png from the same directory as this file.
Override _collect_setup_data() to add OMNY-specific temperatures,
sample name, and measurement settings.
The old OMNY spec webpage showed:
- Cryo temperatures (XOMNY-TEMP-CRYO-A/B)
- Per-channel temperatures (XOMNY-TEMP1..48)
- Dewar pressure / LN2 flow
- Interferometer strengths (OINTERF)
Map these to BEC device paths below once available.
"""
# TODO: fill in OMNY-specific device paths
# label -> dotpath under device_manager.devices
_TEMP_MAP = {
# "Sample (cryo A)": "omny_temp.cryo_a",
# "Cryo head (B)": "omny_temp.cryo_b",
}
def _logo_path(self):
return Path(__file__).parent / "OMNY.png"
def _collect_setup_data(self) -> dict:
# ── OMNY-specific data goes here ──────────────────────────────
# Uncomment and adapt when device names are known:
#
# dm = self._bec.device_manager
# sample_name = _safe_get(dm, "omny_samples.sample_names.sample0") or "N/A"
# temperatures = {
# label: _safe_float(_safe_get(dm, path))
# for label, path in self._TEMP_MAP.items()
# }
# settings = {
# "Sample name": sample_name,
# "FOV x / y": ...,
# "Exposure time": _gvar(self._bec, "tomo_countingtime", ".3f", " s"),
# "Angle step": _gvar(self._bec, "tomo_angle_stepsize", ".2f", "\u00b0"),
# }
# return {
# "type": "omny",
# "sample_name": sample_name,
# "temperatures": temperatures,
# "settings": settings,
# }
# Placeholder — returns minimal info until implemented
return {
"type": "omny",
# OMNY-specific data here
}

View File

@@ -72,7 +72,7 @@ xbpm3x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -95,7 +95,7 @@ xbpm3y:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -118,7 +118,7 @@ sl3trxi:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -141,7 +141,7 @@ sl3trxo:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -164,7 +164,7 @@ sl3trxb:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -187,7 +187,7 @@ sl3trxt:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -210,7 +210,7 @@ fast_shutter_n1_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -234,7 +234,7 @@ fast_shutter_o1_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -257,7 +257,7 @@ fast_shutter_o2_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -280,7 +280,7 @@ filter_array_1_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -303,7 +303,7 @@ filter_array_2_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -326,7 +326,7 @@ filter_array_3_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -349,7 +349,7 @@ filter_array_4_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -372,7 +372,7 @@ sl4trxi:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -395,7 +395,7 @@ sl4trxo:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -418,7 +418,7 @@ sl4trxb:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -441,7 +441,7 @@ sl4trxt:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -466,7 +466,7 @@ sl5trxi:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -489,7 +489,7 @@ sl5trxo:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -512,7 +512,7 @@ sl5trxb:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -535,7 +535,7 @@ sl5trxt:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -544,6 +544,66 @@ sl5trxt:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 5
sl5ch:
description: ESbox1 slit 5 center horizontal
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
deviceConfig:
left_slit: sl5trxi
right_slit: sl5trxo
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxi
- sl5trxo
sl5wh:
description: ESbox1 slit 5 width horizontal
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
deviceConfig:
left_slit: sl5trxi
right_slit: sl5trxo
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxi
- sl5trxo
sl5cv:
description: ESbox1 slit 5 center vertical
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
deviceConfig:
left_slit: sl5trxb
right_slit: sl5trxt
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxb
- sl5trxt
sl5wv:
description: ESbox1 slit 5 width vertical
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
deviceConfig:
left_slit: sl5trxb
right_slit: sl5trxt
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxb
- sl5trxt
xbimtrx:
description: ESbox2 beam intensity monitor x movement
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
@@ -558,7 +618,7 @@ xbimtrx:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -581,7 +641,7 @@ xbimtry:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -589,3 +649,295 @@ xbimtry:
init_position: 0
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 1
################### XBOX related ###################
# we assue the epics settings for resolution, velocity etc. are correct
# we do not overwrite from here
aptrx:
description: Aperture pinhole X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-PIN1:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
aptry:
description: Aperture pinhole Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-PIN1:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
ebtrx:
description: Exposure box aperture X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-EB:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
ebtry:
description: Exposure box aperture Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-EB:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
ebtrz:
description: Exposure box aperture Z
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-EB:TRZ1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
ebsupport:
description: Exposure box granite support Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-EH1-EB:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttrx1:
description: FTS1 translation X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS1:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttry1:
description: FTS1 translation Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS1:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttrx2:
description: FTS2 translation X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS2:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttry2:
description: FTS2 translation Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS2:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttrz:
description: FTS1 translation Z
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS1:TRZ1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
bs1x:
description: Beamstop 1 X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-BS1:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
bs1y:
description: Beamstop 1 Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-BS1:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
bs2x:
description: Beamstop 2 X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-BS2:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
bs2y:
description: Beamstop 2 Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-BS2:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
dttrx:
description: Detector table X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-DETT:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
dttry:
description: Detector table Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-DETT:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
dttrz:
description: Detector table Z
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-DETT:TRZ1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
dettrx:
description: Detector 1 X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-DET1:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
####################
### Beamstop diode control for flight tube
####################
beamstop_gain_control:
description: Gain control for beamstop flightube
deviceClass: csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl
deviceConfig:
gain_lsb: galilrioesft.digital_out.ch0 # Pin 10 -> Galil ch0
gain_mid: galilrioesft.digital_out.ch1 # Pin 11 -> Galil ch1
gain_msb: galilrioesft.digital_out.ch2 # Pin 12 -> Galil ch2
coupling: galilrioesft.digital_out.ch3 # Pin 13 -> Galil ch3
speed_mode: galilrioesft.digital_out.ch4 # Pin 14 -> Galil ch4
enabled: true
readoutPriority: baseline
onFailure: retry
needs:
- galilrioesft
galilrioesft:
description: Galil RIO for remote gain switching and slow reading FlightTube
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
deviceConfig:
host: galilrioesft.psi.ch
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
beamstop_dummy_bpm:
description: BPM Xbox 2 (First Xbox in ES hutch)
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
deviceConfig:
left_top: galilrioesft.analog_in.ch0
right_top: galilrioesft.analog_in.ch1
right_bot: galilrioesft.analog_in.ch2
left_bot: galilrioesft.analog_in.ch3
enabled: true
readoutPriority: baseline
onFailure: retry
needs:
- galilrioesft
beamstop_intensity:
description: Beamstop intensity from Galil analog input ch6
deviceClass: csaxs_bec.devices.pseudo_devices.signal_forwarder.SignalForwarder
deviceConfig:
signal: galilrioesft.analog_in.ch6
enabled: true
readoutPriority: baseline
onFailure: retry
needs:
- galilrioesft

View File

@@ -199,6 +199,25 @@ xbpm1c4:
readOnly: true
softwareTrigger: false
bpm1:
description: 'XBPM1 (frontend)'
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
deviceConfig:
left_top: xbpm1c1
right_top: xbpm1c2
right_bot: xbpm1c3
left_bot: xbpm1c4
onFailure: raise
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
needs:
- xbpm1c1
- xbpm1c2
- xbpm1c3
- xbpm1c4
############################################
######### End of xbpm sub devices ##########
############################################

View File

@@ -68,91 +68,110 @@ ccmx:
- cSAXS
- optics
# TO BE REVIEWED, REMOVE VELOCITY WITH NEW CLASS!
ccm_energy:
description: 'test'
deviceClass: ophyd_devices.devices.simple_positioner.PSISimplePositioner
deviceConfig:
prefix: 'X12SA-OP-CCM1:'
override_suffixes:
user_readback: "ENERGY-GET"
user_setpoint: "ENERGY-SET"
velocity: "ROTY.VELO"
motor_done_move: "ROTY.DMOV"
onFailure: buffer
enabled: true
readoutPriority: baseline
readOnly: false
softwareTrigger: false
##########################################################################
######################## SMARACT STAGES ##################################
##########################################################################
xbpm2x:
description: X-ray beam position monitor 1 in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: A
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 0
# xbpm2x:
# description: X-ray beam position monitor 1 in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: A
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 0
xbpm2y:
description: X-ray beam position monitor 1 in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: B
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 1
# xbpm2y:
# description: X-ray beam position monitor 1 in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: B
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 1
# cu_foilx:
# description: Cu foil in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: C
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 2
# scinx:
# description: scintillator in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: D
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 3
cu_foilx:
description: Cu foil in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: C
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 2
scinx:
description: scintillator in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: D
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 3
# dmm1_trx_readback_example: # This is the same template as for i.e. bpm4i

View File

@@ -1,8 +1,8 @@
# This is the main configuration file that is
# commented or uncommented according to the type of experiment
# optics:
# - !include ./bl_optics_hutch.yaml
optics:
- !include ./bl_optics_hutch.yaml
# frontend:
# - !include ./bl_frontend.yaml

View File

@@ -395,7 +395,7 @@ rtz:
readoutPriority: on_request
connectionTimeout: 20
rt_flyer:
rt_positions:
deviceClass: csaxs_bec.devices.omny.rt.rt_flomni_ophyd.RtFlomniFlyer
deviceConfig:
host: mpc2844.psi.ch
@@ -522,6 +522,19 @@ omny_panda:
FMC_IN.VAL2.Min: cap_voltage_fzp_x_min
FMC_IN.VAL2.Max: cap_voltage_fzp_x_max
FMC_IN.VAL2.Mean: cap_voltage_fzp_x_mean
INENC1.VAL.Max: interf_st_fzp_y_max
INENC1.VAL.Mean: interf_st_fzp_y_mean
INENC1.VAL.Min: interf_st_fzp_y_min
INENC2.VAL.Max: interf_st_fzp_x_max
INENC2.VAL.Mean: interf_st_fzp_x_mean
INENC2.VAL.Min: interf_st_fzp_x_min
INENC3.VAL.Max: interf_st_rotz_max
INENC3.VAL.Mean: interf_st_rotz_mean
INENC3.VAL.Min: interf_st_rotz_min
INENC4.VAL.Max: interf_st_rotx_max
INENC4.VAL.Mean: interf_st_rotx_mean
INENC4.VAL.Min: interf_st_rotx_min
PCAP.GATE_DURATION.Value: pcap_gate_duration_value
deviceTags:
- detector
enabled: true

View File

@@ -0,0 +1,24 @@
galilrioesxbox:
description: Galil RIO for remote gain switching and slow reading ES XBox
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
deviceConfig:
host: galilrioesft.psi.ch
enabled: true
onFailure: raise
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
bpm1:
readoutPriority: baseline
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
deviceConfig:
blade_t: galilrioesxbox.analog_in.ch0
blade_r: galilrioesxbox.analog_in.ch1
blade_b: galilrioesxbox.analog_in.ch2
blade_l: galilrioesxbox.analog_in.ch3
enabled: true
readOnly: false
softwareTrigger: true
needs:
- galilrioesxbox

View File

@@ -13,6 +13,14 @@ from ophyd_devices import PSIDeviceBase
logger = bec_logger.logger
class MonitorSignal(Signal):
"""A simple wrapper around ophyd Signal that automatically monitors the signal for changes."""
def __init__(self, *, name, auto_monitor=False, **kwargs):
super().__init__(name=name, **kwargs)
self.auto_monitor = auto_monitor
class OMNYFastShutter(PSIDeviceBase, Device):
"""
Fast Shutter control for OMNY setup. If started with at the beamline, it will expose
@@ -26,7 +34,7 @@ class OMNYFastShutter(PSIDeviceBase, Device):
SUB_VALUE = "value"
_default_sub = SUB_VALUE
shutter = Cpt(Signal, name="shutter")
shutter = Cpt(MonitorSignal, name="shutter", auto_monitor=True)
# -----------------------------------------------------
# User-facing shutter control functions
@@ -48,7 +56,6 @@ class OMNYFastShutter(PSIDeviceBase, Device):
def fshopen(self):
"""Open the fast shutter."""
if self._check_if_cSAXS_shutter_exists_in_config():
self.shutter.put(1)
return self.device_manager.devices["fsh"].fshopen()
else:
self.shutter.put(1)
@@ -56,7 +63,6 @@ class OMNYFastShutter(PSIDeviceBase, Device):
def fshclose(self):
"""Close the fast shutter."""
if self._check_if_cSAXS_shutter_exists_in_config():
self.shutter.put(0)
return self.device_manager.devices["fsh"].fshclose()
else:
self.shutter.put(0)

View File

@@ -0,0 +1,172 @@
"""Module for a BPM pseudo device that computes the position and intensity from the blade signals."""
from ophyd import Component as Cpt
from ophyd import Kind, Signal
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
class BPM(PSIPseudoDeviceBase):
"""BPM positioner pseudo device."""
# Blade signals, a,b,c,d
left_top = Cpt(
BECProcessedSignal,
name="left_top",
model_config=None,
kind=Kind.config,
doc="BPM left_top blade",
)
right_top = Cpt(
BECProcessedSignal,
name="right_top",
model_config=None,
kind=Kind.config,
doc="BPM right_top blade",
)
right_bot = Cpt(
BECProcessedSignal,
name="right_bot",
model_config=None,
kind=Kind.config,
doc="BPM right_bottom blade",
)
left_bot = Cpt(
BECProcessedSignal,
name="left_bot",
model_config=None,
kind=Kind.config,
doc="BPM left_bot blade",
)
# Virtual signals
pos_x = Cpt(
BECProcessedSignal,
name="pos_x",
model_config=None,
kind=Kind.config,
doc="BPM X position, -1 fully left, 1 fully right",
)
pos_y = Cpt(
BECProcessedSignal,
name="pos_y",
model_config=None,
kind=Kind.config,
doc="BPM Y position, -1 fully bottom, 1 fully top",
)
diagonal = Cpt(
BECProcessedSignal,
name="diagonal",
model_config=None,
kind=Kind.config,
doc="BPM diagonal, -1 fully diagonal left_top-right_bot, 1 fully diagonal right_top-left_bot",
)
intensity = Cpt(
BECProcessedSignal,
name="intensity",
model_config=None,
kind=Kind.config,
doc="BPM intensity",
)
def __init__(
self,
name,
left_top: str,
right_top: str,
right_bot: str,
left_bot: str,
device_manager=None,
scan_info=None,
**kwargs,
):
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
# Get all blade signal objects from utility method
signal_t = self.left_top.get_device_object_from_bec(
object_name=left_top, signal_name=self.name, device_manager=device_manager
)
signal_r = self.right_top.get_device_object_from_bec(
object_name=right_top, signal_name=self.name, device_manager=device_manager
)
signal_b = self.right_bot.get_device_object_from_bec(
object_name=right_bot, signal_name=self.name, device_manager=device_manager
)
signal_l = self.left_bot.get_device_object_from_bec(
object_name=left_bot, signal_name=self.name, device_manager=device_manager
)
# Set compute methods for blade signals and virtual signals
self.left_top.set_compute_method(self._compute_blade_signal, signal=signal_t)
self.right_top.set_compute_method(self._compute_blade_signal, signal=signal_r)
self.right_bot.set_compute_method(self._compute_blade_signal, signal=signal_b)
self.left_bot.set_compute_method(self._compute_blade_signal, signal=signal_l)
self.intensity.set_compute_method(
self._compute_intensity,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
left_bot=self.left_bot,
)
self.pos_x.set_compute_method(
self._compute_pos_x,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
self.pos_y.set_compute_method(
self._compute_pos_y,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
self.diagonal.set_compute_method(
self._compute_diagonal,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
def _compute_blade_signal(self, signal: Signal) -> float:
return signal.get()
def _compute_intensity(
self, left_top: Signal, right_top: Signal, right_bot: Signal, left_bot: Signal
) -> float:
intensity = left_top.get() + right_top.get() + right_bot.get() + left_bot.get()
return intensity
def _compute_pos_x(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
"""X position from -1 to 1, where -1 means beam fully on the left side, 1 means beam fully on the right side."""
sum_left = left_bot.get() + left_top.get()
sum_right = right_top.get() + right_bot.get()
sum_total = sum_left + sum_right
if sum_total == 0:
return 0.0
return (sum_right - sum_left) / sum_total
def _compute_pos_y(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
"""Y position from -1 to 1, where -1 means beam fully on the bottom side, 1 means beam fully on the top side."""
sum_top = left_top.get() + right_top.get()
sum_bot = right_bot.get() + left_bot.get()
sum_total = sum_top + sum_bot
if sum_total == 0:
return 0.0
return (sum_top - sum_bot) / sum_total
def _compute_diagonal(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
sum_diag1 = left_bot.get() + right_top.get()
sum_diag2 = left_top.get() + right_bot.get()
sum_total = sum_diag1 + sum_diag2
if sum_total == 0:
return 0.0
return (sum_diag1 - sum_diag2) / sum_total

View File

@@ -0,0 +1,189 @@
"""
Module for controlling the BPM amplifier settings, such as gain and coupling.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Literal
from ophyd import Component as Cpt
from ophyd import Kind
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
from ophyd import Signal
_GAIN_BITS_LOW_NOISE: dict[tuple, int] = {
(0, 0, 0): int(1e3),
(0, 0, 1): int(1e4),
(0, 1, 0): int(1e5),
(0, 1, 1): int(1e6),
(1, 0, 0): int(1e7),
(1, 0, 1): int(1e8),
(1, 1, 0): int(1e9),
}
_GAIN_BITS_HIGH_SPEED: dict[tuple, int] = {
(0, 0, 0): int(1e5),
(0, 0, 1): int(1e6),
(0, 1, 0): int(1e7),
(0, 1, 1): int(1e8),
(1, 0, 0): int(1e9),
(1, 0, 1): int(1e10),
(1, 1, 0): int(1e11),
}
_GAIN_TO_BITS: dict[int, tuple] = {}
for _bits, _gain in _GAIN_BITS_LOW_NOISE.items():
_GAIN_TO_BITS[_gain] = (*_bits, True)
for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items():
if _gain not in _GAIN_TO_BITS: # low-noise takes priority
_GAIN_TO_BITS[_gain] = (*_bits, False)
VALID_GAINS = sorted(_GAIN_TO_BITS.keys())
class BPMControl(PSIPseudoDeviceBase):
"""
BPM amplifier control pseudo device. It is responsible for controlling the
gain and coupling for the BPM amplifier. It relies on signals from a device
in BEC to be available. For cSAXS, these are most liikely to be from the
GalilRIO device that controls the BPM amplifier.
Args:
name (str): Name of the pseudo device.
gain_lsb (str): Name of the signal in BEC that controls the LSB
of the gain setting.
gain_mid (str): Name of the signal in BEC that controls the MID
bit of the gain setting.
gain_msb (str): Name of the signal in BEC that controls the MSB
of the gain setting.
coupling (str): Name of the signal in BEC that controls the coupling
setting.
speed_mode (str): Name of the signal in BEC that controls the speed mode
(low-noise vs high-speed) of the amplifier.
"""
USER_ACCESS = ["set_gain", "set_coupling"]
gain = Cpt(
BECProcessedSignal,
name="gain",
model_config=None,
kind=Kind.config,
doc="Gain of the amplifier",
)
coupling = Cpt(
BECProcessedSignal,
name="coupling",
model_config=None,
kind=Kind.config,
doc="Coupling of the amplifier",
)
speed = Cpt(
BECProcessedSignal,
name="speed",
model_config=None,
kind=Kind.config,
doc="Speed of the amplifier",
)
def __init__(
self,
name: str,
gain_lsb: str,
gain_mid: str,
gain_msb: str,
coupling: str,
speed_mode: str,
device_manager: DeviceManagerDS | None = None,
scan_info: ScanInfo | None = None,
**kwargs,
):
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
# First we get all signal objects from BEC using the utility method provided by the BECProcessedSignal class.
self._gain_lsb = self.gain.get_device_object_from_bec(
object_name=gain_lsb, signal_name=self.name, device_manager=device_manager
)
self._gain_mid = self.gain.get_device_object_from_bec(
object_name=gain_mid, signal_name=self.name, device_manager=device_manager
)
self._gain_msb = self.gain.get_device_object_from_bec(
object_name=gain_msb, signal_name=self.name, device_manager=device_manager
)
self._coupling = self.gain.get_device_object_from_bec(
object_name=coupling, signal_name=self.name, device_manager=device_manager
)
self._speed_mode = self.gain.get_device_object_from_bec(
object_name=speed_mode, signal_name=self.name, device_manager=device_manager
)
# Set the compute methods for the virtual signals.
self.gain.set_compute_method(
self._compute_gain,
msb=self._gain_msb,
mid=self._gain_mid,
lsb=self._gain_lsb,
speed_mode=self._speed_mode,
)
self.coupling.set_compute_method(self._compute_coupling, coupling=self._coupling)
self.speed.set_compute_method(self._compute_speed, speed=self._speed_mode)
def set_gain(
self,
gain: Literal[
1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000, 100000000000
],
) -> None:
"""
Set the gain of the amplifier.
Args:
gain (Literal): Must be one of 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000.
"""
gain_int = int(gain)
if gain_int not in VALID_GAINS:
raise ValueError(
f"{self.name} received invalid gain {gain_int}, must be in {VALID_GAINS}"
)
msb, mid, lsb, use_low_noise = _GAIN_TO_BITS[gain_int]
self._gain_msb.set(bool(msb)).wait(timeout=2)
self._gain_lsb.set(bool(lsb)).wait(timeout=2)
self._gain_mid.set(bool(mid)).wait(timeout=2)
self._speed_mode.set(bool(use_low_noise))
def set_coupling(self, coupling: Literal["AC", "DC"]) -> None:
"""
Set the coupling of the amplifier.
Args:
coupling (Literal): Must be either "AC" or "DC".
"""
if coupling not in ["AC", "DC"]:
raise ValueError(
f"{self.name} received invalid coupling value {coupling}, please use 'AC' or 'DC'"
)
self._coupling.set(coupling == "DC").wait(timeout=2)
def _compute_gain(self, msb: Signal, mid: Signal, lsb: Signal, speed_mode: Signal) -> int:
"""Compute the gain based on the bits and speed mode."""
bits = (msb.get(), mid.get(), lsb.get())
speed_mode = speed_mode.get()
if speed_mode:
return _GAIN_BITS_LOW_NOISE.get(bits)
else:
return _GAIN_BITS_HIGH_SPEED.get(bits)
def _compute_coupling(self, coupling: Signal) -> str:
"""Compute the coupling based on the signal."""
return "DC" if coupling.get() else "AC"
def _compute_speed(self, speed: Signal) -> str:
"""Compute the speed based on the signal."""
return "low_speed" if speed.get() else "high_speed"

View File

@@ -0,0 +1 @@
# from ophyd

View File

@@ -0,0 +1,41 @@
"""
Pseudo device that forwards a single BEC signal 1:1.
"""
from ophyd import Component as Cpt
from ophyd import Kind, Signal
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
class SignalForwarder(PSIPseudoDeviceBase):
"""Forward one signal unchanged."""
signal = Cpt(
BECProcessedSignal,
name="signal",
model_config=None,
kind=Kind.config,
doc="Forwarded signal",
)
def __init__(
self,
name,
signal: str,
device_manager=None,
scan_info=None,
**kwargs,
):
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
src = self.signal.get_device_object_from_bec(
object_name=signal,
signal_name=self.name,
device_manager=device_manager,
)
self.signal.set_compute_method(self._compute_signal, signal=src)
def _compute_signal(self, signal: Signal) -> float:
return signal.get()

View File

@@ -1 +1 @@
from .csaxs_nexus import NeXus_format as cSAXS_NeXus_format
from .csaxs_nexus import cSAXSNeXusFormat

View File

@@ -1,445 +1,470 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any
import numpy as np
if TYPE_CHECKING:
from bec_lib.devicemanager import DeviceManagerBase
from bec_server.file_writer.file_writer import HDF5Storage
from bec_server.file_writer.default_writer import DefaultFormat
def get_entry(data: dict, name: str, default=None) -> Any:
class cSAXSNeXusFormat(DefaultFormat):
"""
Get an entry from the scan data assuming a <device>.<device>.value structure.
NeXus file format for the cSAXS beamline (BEC era).
Args:
data (dict): Scan data
name (str): Entry name
default (Any, optional): Default value. Defaults to None.
Mirrors the old SPEC layout.xml hierarchy and adds the flOMNI instrument
group for the nano-positioning stage used in ptychography.
Device resilience
-----------------
Every device read (self.get_entry / device call) is wrapped in try/except.
If a device is removed from the BEC config file between sessions it simply
disappears from the device_manager — the corresponding dataset or link is
silently omitted from the HDF5 file without raising an error. This means
the file structure is additive: re-add the device to the config and the
field reappears automatically on the next scan.
Top-level HDF5 structure
────────────────────────
/entry NXentry (definition = NXptycho)
/sample NXsample ← primary sample group
/entry_ptycho NXentry ← generic ptycho entry
/data_soft NXentry ← convenience Eiger frame links
/control NXmonitor
/instrument NXinstrument
/source
/insertion_device
/monochromator
/XBPM3
/slit_3 … slit_5
/filter_set
/beam_stop_1 … beam_stop_2
/eiger_1_5 NXdetector
/mcs NXdetector
/flOMNI NXpositioner
Device name mapping (old SPEC → current BEC)
────────────────────────────────────────────
samx / samy → samx / samy (generic; kept for non-flOMNI configs)
sl3wh/wv/ch/cv → sl3trxi/o/b/t (individual blade motors; gap/centre TODO)
sl4wh/wv/ch/cv → sl4trxi/o/b/t
sl5wh/wv/ch/cv → sl5trxi/o/b/t
bs1x / bs1y → bs1x / bs1y
bs2x / bs2y → bs2x / bs2y
dettrx → dettrx
eiger_4 → eiger_1_5
mcs → mcs
filter_array → filter_array_1_x … filter_array_4_x
xbpm3 → xbpm3x / xbpm3y (stage positions; signal readouts TODO)
energy → ccm_energy
TODO (devices not yet in BEC list)
───────────────────────────────────
curr, idgap ring current, undulator gap
moth1, mobd monochromator crystal angles
mith, mibd, mirror_coating mirror
bpm3s/x/y/z XBPM3 signal readouts
sl0 / sl1 / sl2 upstream optics-hutch slits
slit gap / centre derived from blade pairs + calibration offset
"""
if isinstance(data.get(name), list) and isinstance(data.get(name)[0], dict):
return [sub_data.get(name, {}).get("value", default) for sub_data in data.get(name)]
return data.get(name, {}).get(name, {}).get("value", default)
# -------------------------------------------------------------------------
# Helpers
# -------------------------------------------------------------------------
def _safe_dataset(self, group, name: str, device: str,
units: str | None = None,
description: str | None = None) -> None:
"""
Write a dataset from the BEC scan data dictionary.
Silently skips if the device was not recorded in this scan
(e.g. removed from config, readoutPriority=on_request and not triggered,
or the scan finished before the device responded).
"""
try:
value = self.get_entry(device)
ds = group.create_dataset(name, data=value)
if units:
ds.attrs["units"] = units
if description:
ds.attrs["description"] = description
except Exception:
pass
def _safe_soft_link(self, group, name: str, target: str) -> None:
"""Create a soft link; silently skip on any error."""
try:
group.create_soft_link(name, target)
except Exception:
pass
def _slit_blades(self, group, prefix: str) -> None:
"""
Store individual blade motor positions for a 4-blade slit set.
Derived quantities (gap, centre) require a per-slit calibration offset
and will be added in a later update.
"""
for blade, motor in [
("inner_x", f"{prefix}trxi"),
("outer_x", f"{prefix}trxo"),
("bottom_y", f"{prefix}trxb"),
("top_y", f"{prefix}trxt"),
]:
self._safe_dataset(group, blade, motor, units="mm")
def NeXus_format(
storage: HDF5Storage, data: dict, file_references: dict, device_manager: DeviceManagerBase
) -> HDF5Storage:
"""
Prepare the NeXus file format.
# -------------------------------------------------------------------------
# Main format method
# -------------------------------------------------------------------------
Args:
storage (HDF5Storage): HDF5 storage. Pseudo hdf5 file container that will be written to disk later.
data (dict): scan data
file_references (dict): File references. Can be used to add external files to the HDF5 file. The path is given relative to the HDF5 file.
device_manager (DeviceManagerBase): Device manager. Can be used to check if devices are available.
def format(self) -> None:
"""Build the NeXus/HDF5 layout for a cSAXS scan."""
Returns:
HDF5Storage: Updated HDF5 storage
"""
# /entry
entry = storage.create_group("entry")
entry.attrs["NX_class"] = "NXentry"
entry.attrs["definition"] = "NXsas"
entry.attrs["start_time"] = data.get("start_time")
entry.attrs["end_time"] = data.get("end_time")
entry.attrs["version"] = 1.0
# Canonical paths referenced by multiple groups
RT_POS_PATH = "/entry/instrument/flOMNI/rt_positions"
EIGER_COLL = "/entry/collection/file_references/eiger_1_5"
# /entry/collection
collection = entry.create_group("collection")
collection.attrs["NX_class"] = "NXcollection"
bec_collection = collection.create_group("bec")
# ── Root entry ────────────────────────────────────────────────────────
entry = self.storage.create_group("entry")
entry.attrs["NX_class"] = "NXentry"
entry.attrs["definition"] = "NXptycho"
# /entry/control
control = entry.create_group("control")
control.attrs["NX_class"] = "NXmonitor"
control.create_dataset(name="mode", data="monitor")
control.create_dataset(name="integral", data=get_entry(data, "bpm4i"))
# ── /entry/sample ─────────────────────────────────────────────────────
# Primary sample group. Contains the name of the mounted sample and a
# link to the real-time scan positions. Generic samx/samy are recorded
# here so the group is meaningful for non-flOMNI configurations too.
sample = entry.create_group("sample")
sample.attrs["NX_class"] = "NXsample"
# Soft-link name directly to the value BEC recorded in the collection.
# Only written when flomni_samples is present; other configs leave name absent.
if "flomni_samples" in self.device_manager.devices:
self._safe_soft_link(
sample, "name",
"/entry/collection/devices/flomni_samples"
"/flomni_samples_sample_names_sample0/value",
)
# Generic coarse stage positions (meaningful in non-flOMNI setups)
self._safe_dataset(sample, "x_translation", "samx", units="mm")
self._safe_dataset(sample, "y_translation", "samy", units="mm")
# Real-time encoder positions — the primary scan coordinate
self._safe_soft_link(sample, "positions", RT_POS_PATH)
# /entry/data
main_data = entry.create_group("data")
main_data.attrs["NX_class"] = "NXdata"
if "eiger_4" in device_manager.devices:
main_data.create_soft_link(name="data", target="/entry/instrument/eiger_4/data")
elif "eiger9m" in device_manager.devices:
main_data.create_soft_link(name="data", target="/entry/instrument/eiger9m/data")
elif "pilatus_2" in device_manager.devices:
main_data.create_soft_link(name="data", target="/entry/instrument/pilatus_2/data")
# ── /entry/entry_ptycho ───────────────────────────────────────────────
# Generic ptychography entry. Detector data and scan positions are
# linked in from the instrument groups so this entry is self-contained
# for downstream reconstruction codes.
entry_ptycho = entry.create_group("entry_ptycho")
entry_ptycho.attrs["NX_class"] = "NXentry"
entry_ptycho.attrs["definition"] = "NXptycho"
# /entry/sample
control = entry.create_group("sample")
control.attrs["NX_class"] = "NXsample"
control.create_dataset(name="name", data=get_entry(data, "samplename"))
control.create_dataset(name="description", data=data.get("sample_description"))
x_translation = control.create_dataset(name="x_translation", data=get_entry(data, "samx"))
x_translation.attrs["units"] = "mm"
y_translation = control.create_dataset(name="y_translation", data=get_entry(data, "samy"))
y_translation.attrs["units"] = "mm"
temperature_log = control.create_dataset(name="temperature_log", data=get_entry(data, "temp"))
temperature_log.attrs["units"] = "K"
nxdata = entry_ptycho.create_group("data")
nxdata.attrs["NX_class"] = "NXdata"
nxdata.attrs["signal"] = "data"
# Detector frames
try:
for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys():
self._safe_soft_link(nxdata, k, f"{EIGER_COLL}/{k}")
except Exception:
pass
# Scan positions
self._safe_soft_link(nxdata, "positions", RT_POS_PATH)
# /entry/instrument
instrument = entry.create_group("instrument")
instrument.attrs["NX_class"] = "NXinstrument"
instrument.create_dataset(name="name", data="cSAXS beamline")
# Link to the primary sample group
self._safe_soft_link(entry_ptycho, "sample", "/entry/sample")
source = instrument.create_group("source")
source.attrs["NX_class"] = "NXsource"
source.create_dataset(name="type", data="Synchrotron X-ray Source")
source.create_dataset(name="name", data="Swiss Light Source")
source.create_dataset(name="probe", data="x-ray")
distance = source.create_dataset(
name="distance", data=-33800 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
sigma_x = source.create_dataset(name="sigma_x", data=0.202)
sigma_x.attrs["units"] = "mm"
sigma_y = source.create_dataset(name="sigma_y", data=0.018)
sigma_y.attrs["units"] = "mm"
divergence_x = source.create_dataset(name="divergence_x", data=0.000135)
divergence_x.attrs["units"] = "radians"
divergence_y = source.create_dataset(name="divergence_y", data=0.000025)
divergence_y.attrs["units"] = "radians"
current = source.create_dataset(name="current", data=get_entry(data, "curr"))
current.attrs["units"] = "mA"
# ── /entry/data_soft ──────────────────────────────────────────────────
# Convenience group mirroring the old /entry/data hardlink from layout.xml.
data_soft = entry.create_group("data_soft")
data_soft.attrs["NX_class"] = "NXentry"
try:
for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys():
self._safe_soft_link(data_soft, k, f"{EIGER_COLL}/{k}")
except Exception:
pass
insertion_device = instrument.create_group("insertion_device")
insertion_device.attrs["NX_class"] = "NXinsertion_device"
source.create_dataset(name="type", data="undulator")
gap = source.create_dataset(name="gap", data=get_entry(data, "idgap"))
gap.attrs["units"] = "mm"
k = source.create_dataset(name="k", data=2.46)
k.attrs["units"] = "NX_DIMENSIONLESS"
length = source.create_dataset(name="length", data=1820)
length.attrs["units"] = "mm"
# ── /entry/control ────────────────────────────────────────────────────
control = entry.create_group("control")
control.attrs["NX_class"] = "NXmonitor"
control.create_dataset("mode", data="monitor")
# TODO: beam intensity integral — add device when available
# self._safe_dataset(control, "integral", "bpm_sum", units="NX_DIMENSIONLESS")
slit_0 = instrument.create_group("slit_0")
slit_0.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="OFHC Cu")
source.create_dataset(name="description", data="Horizontal secondary source slit")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl0wh"))
x_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl0ch"))
x_translation.attrs["units"] = "mm"
distance = source.create_dataset(
name="distance", data=-21700 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
# ── /entry/instrument ─────────────────────────────────────────────────
instrument = entry.create_group("instrument")
instrument.attrs["NX_class"] = "NXinstrument"
instrument.create_dataset("name", data="cSAXS beamline")
slit_1 = instrument.create_group("slit_1")
slit_1.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="OFHC Cu")
source.create_dataset(name="description", data="Horizontal secondary source slit")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl1wh"))
x_gap.attrs["units"] = "mm"
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl1wv"))
y_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl1ch"))
x_translation.attrs["units"] = "mm"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl1ch"))
height.attrs["units"] = "mm"
distance = source.create_dataset(
name="distance", data=-7800 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
# ── Source ────────────────────────────────────────────────────────────
# Numerical values are currently unknown and stored as 0.
# Will be updated once the corresponding devices are in BEC.
source = instrument.create_group("source")
source.attrs["NX_class"] = "NXsource"
source.create_dataset("type", data="Synchrotron X-ray Source")
source.create_dataset("name", data="Swiss Light Source")
source.create_dataset("probe", data="x-ray")
source.create_dataset("sigma_x", data=0.0).attrs["units"] = "mm"
source.create_dataset("sigma_y", data=0.0).attrs["units"] = "mm"
source.create_dataset("divergence_x", data=0.0).attrs["units"] = "radians"
source.create_dataset("divergence_y", data=0.0).attrs["units"] = "radians"
# TODO: current — add device when available
# self._safe_dataset(source, "current", "curr", units="mA")
mono = instrument.create_group("monochromator")
mono.attrs["NX_class"] = "NXmonochromator"
mokev = data.get("mokev", {})
if mokev:
if isinstance(mokev, list):
mokev = mokev[0]
wavelength = mono.create_dataset(
name="wavelength", data=12.3984193 / (mokev.get("mokev").get("value") + 1e-9)
# ── Insertion device ──────────────────────────────────────────────────
insertion_device = instrument.create_group("insertion_device")
insertion_device.attrs["NX_class"] = "NXinsertion_device"
insertion_device.create_dataset("type", data="undulator")
insertion_device.create_dataset("k", data=0.0)
insertion_device.create_dataset("length", data=0.0).attrs["units"] = "mm"
# TODO: gap — add device when available
# self._safe_dataset(insertion_device, "gap", "idgap", units="mm")
# ── Monochromator ─────────────────────────────────────────────────────
# ccm_energy is a baseline device and is recorded in the scan data.
mono = instrument.create_group("monochromator")
mono.attrs["NX_class"] = "NXmonochromator"
mono.create_dataset("type", data="Double crystal fixed exit monochromator.")
try:
energy_kev = self.get_entry("ccm_energy")
energy_arr = np.asarray(energy_kev, dtype=float)
en_ds = mono.create_dataset("energy", data=energy_arr)
en_ds.attrs["units"] = "keV"
with np.errstate(divide="ignore", invalid="ignore"):
wavelength = np.where(energy_arr != 0, 12.3984193 / energy_arr, 0.0)
wl_ds = mono.create_dataset("wavelength", data=wavelength)
wl_ds.attrs["units"] = "Angstrom"
except Exception:
pass
# TODO: crystal angles — add moth1 / mobd when available
# crystal_1 = mono.create_group("crystal_1")
# crystal_1.attrs["NX_class"] = "NXcrystal"
# crystal_1.create_dataset("usage", data="Bragg")
# crystal_1.create_dataset("type", data="Si")
# crystal_1.create_dataset("order_no", data=1.0)
# crystal_1.create_dataset("reflection", data="[1 1 1]")
# self._safe_dataset(crystal_1, "bragg_angle", "moth1", units="degrees")
# crystal_2 = mono.create_group("crystal_2")
# crystal_2.attrs["NX_class"] = "NXcrystal"
# crystal_2.create_dataset("usage", data="Bragg")
# crystal_2.create_dataset("type", data="Si")
# crystal_2.create_dataset("order_no", data=2.0)
# crystal_2.create_dataset("reflection", data="[1 1 1]")
# self._safe_dataset(crystal_2, "bragg_angle", "moth1", units="degrees")
# self._safe_dataset(crystal_2, "bend_x", "mobd", units="degrees")
# ── Mirror ────────────────────────────────────────────────────────────
# TODO: mith, mibd, mirror_coating not yet in device list
# mirror = instrument.create_group("mirror")
# mirror.attrs["NX_class"] = "NXmirror"
# mirror.create_dataset("type", data="single")
# mirror.create_dataset(
# "description",
# data=(
# "Grazing incidence mirror to reject high-harmonic wavelengths. "
# "Three coating options: no coating (SiO2), rhodium (Rh), platinum (Pt)."
# ),
# )
# mirror.create_dataset("substrate_material", data="SiO2")
# self._safe_dataset(mirror, "incident_angle", "mith", units="degrees")
# self._safe_dataset(mirror, "coating_material", "mirror_coating", units="NX_CHAR")
# self._safe_dataset(mirror, "bend_y", "mibd", units="NX_DIMENSIONLESS")
# ── Upstream slits (optics hutch) ─────────────────────────────────────
# TODO: slit_0 / slit_1 / slit_2 motors not yet in BEC device list
# slit_0 = instrument.create_group("slit_0")
# ...
# slit_1 = instrument.create_group("slit_1")
# ...
# slit_2 = instrument.create_group("slit_2")
# ...
# ── XBPM3 ─────────────────────────────────────────────────────────────
# xbpm3x/xbpm3y are stage motor positions for aligning the monitor.
# Signal readouts (sum/x/y/skew) are TODO once MCS channels are mapped.
xbpm3 = instrument.create_group("XBPM3")
xbpm3.attrs["NX_class"] = "NXdetector"
xbpm3.attrs["description"] = "X-ray beam position monitor 3, experimental hutch"
self._safe_dataset(xbpm3, "x_stage", "xbpm3x", units="mm",
description="XBPM3 stage x-translation")
self._safe_dataset(xbpm3, "y_stage", "xbpm3y", units="mm",
description="XBPM3 stage y-translation")
# TODO: signal readout sub-groups once MCS channels are configured
# for suffix, entry_name, desc in [
# ("sum", "bpm3s", "Sum of counts for the four quadrants."),
# ("x", "bpm3x", "Normalized diff, left vs right quadrants."),
# ("y", "bpm3y", "Normalized diff, high vs low quadrants."),
# ("skew", "bpm3z", "Normalized diff, diagonal quadrants."),
# ]:
# g = xbpm3.create_group(f"XBPM3_{suffix}")
# self._safe_dataset(g, "data", entry_name, units="NX_DIMENSIONLESS")
# g.create_dataset("description", data=desc)
# ── Slit 3 (experimental hutch, exposure box) ─────────────────────────
slit_3 = instrument.create_group("slit_3")
slit_3.attrs["NX_class"] = "NXslit"
slit_3.create_dataset("material", data="Si")
slit_3.create_dataset("description", data="Slit 3, experimental hutch, exposure box")
# TODO: gap / centre require per-slit calibration offset — add later
self._slit_blades(slit_3, "sl3")
# ── Filter set ────────────────────────────────────────────────────────
filter_set = instrument.create_group("filter_set")
filter_set.attrs["NX_class"] = "NXattenuator"
filter_set.create_dataset("material", data="Si")
filter_set.create_dataset(
"description",
data=(
"Four linear filter stages (filter_array_1_x … filter_array_4_x). "
"Each stage has five filter positions plus an 'out' position."
),
)
wavelength.attrs["units"] = "Angstrom"
energy = mono.create_dataset(name="energy", data=mokev.get("mokev").get("value"))
energy.attrs["units"] = "keV"
mono.create_dataset(name="type", data="Double crystal fixed exit monochromator.")
distance = mono.create_dataset(
name="distance", data=-5220 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
for i in range(1, 5):
self._safe_dataset(filter_set, f"stage_{i}_x",
f"filter_array_{i}_x", units="mm")
# TODO: attenuator_transmission = 10^(ftrans) once device is available
crystal_1 = mono.create_group("crystal_1")
crystal_1.attrs["NX_class"] = "NXcrystal"
crystal_1.create_dataset(name="usage", data="Bragg")
crystal_1.create_dataset(name="order_no", data="1")
crystal_1.create_dataset(name="reflection", data="[1 1 1]")
bragg_angle = crystal_1.create_dataset(name="bragg_angle", data=get_entry(data, "moth1"))
bragg_angle.attrs["units"] = "degrees"
# ── Slit 4 (experimental hutch, exposure box) ─────────────────────────
slit_4 = instrument.create_group("slit_4")
slit_4.attrs["NX_class"] = "NXslit"
slit_4.create_dataset("material", data="Ge")
slit_4.create_dataset("description", data="Slit 4, experimental hutch, exposure box")
self._slit_blades(slit_4, "sl4")
crystal_2 = mono.create_group("crystal_2")
crystal_2.attrs["NX_class"] = "NXcrystal"
crystal_2.create_dataset(name="usage", data="Bragg")
crystal_2.create_dataset(name="order_no", data="2")
crystal_2.create_dataset(name="reflection", data="[1 1 1]")
bragg_angle = crystal_2.create_dataset(name="bragg_angle", data=get_entry(data, "moth1"))
bragg_angle.attrs["units"] = "degrees"
bend_x = crystal_2.create_dataset(name="bend_x", data=get_entry(data, "mobd"))
bend_x.attrs["units"] = "degrees"
# ── Slit 5 (experimental hutch, exposure box) ─────────────────────────
slit_5 = instrument.create_group("slit_5")
slit_5.attrs["NX_class"] = "NXslit"
slit_5.create_dataset("material", data="Si")
slit_5.create_dataset("description", data="Slit 5, experimental hutch, exposure box")
self._slit_blades(slit_5, "sl5")
xbpm4 = instrument.create_group("XBPM4")
xbpm4.attrs["NX_class"] = "NXdetector"
xbpm4_sum = xbpm4.create_group("XBPM4_sum")
xbpm4_sum_data = xbpm4_sum.create_dataset(name="data", data=get_entry(data, "bpm4s"))
xbpm4_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm4_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
xbpm4_x = xbpm4.create_group("XBPM4_x")
xbpm4_x_data = xbpm4_x.create_dataset(name="data", data=get_entry(data, "bpm4x"))
xbpm4_x_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm4_x.create_dataset(
name="description", data="Normalized difference of counts between left and right quadrants."
)
xbpm4_y = xbpm4.create_group("XBPM4_y")
xbpm4_y_data = xbpm4_y.create_dataset(name="data", data=get_entry(data, "bpm4y"))
xbpm4_y_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm4_y.create_dataset(
name="description", data="Normalized difference of counts between high and low quadrants."
)
xbpm4_skew = xbpm4.create_group("XBPM4_skew")
xbpm4_skew_data = xbpm4_skew.create_dataset(name="data", data=get_entry(data, "bpm4z"))
xbpm4_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm4_skew.create_dataset(
name="description", data="Normalized difference of counts between diagonal quadrants."
)
# ── Beam stop 1 ────────────────────────────────────────────────────────
beam_stop_1 = instrument.create_group("beam_stop_1")
beam_stop_1.attrs["NX_class"] = "NXbeam_stop"
beam_stop_1.create_dataset("description", data="circular")
beam_stop_1.create_dataset("size", data=3.0).attrs["units"] = "mm"
self._safe_dataset(beam_stop_1, "x", "bs1x", units="mm")
self._safe_dataset(beam_stop_1, "y", "bs1y", units="mm")
# TODO: diode signal behind beam stop 1 when device is available
mirror = instrument.create_group("mirror")
mirror.attrs["NX_class"] = "NXmirror"
mirror.create_dataset(name="type", data="single")
mirror.create_dataset(
name="description",
data="Grazing incidence mirror to reject high-harmonic wavelengths from the monochromator. There are three coating options available that are used depending on the X-ray energy, no coating (SiO2), rhodium (Rh) or platinum (Pt).",
)
incident_angle = mirror.create_dataset(name="incident_angle", data=get_entry(data, "mith"))
incident_angle.attrs["units"] = "degrees"
substrate_material = mirror.create_dataset(name="substrate_material", data="SiO2")
substrate_material.attrs["units"] = "NX_CHAR"
coating_material = mirror.create_dataset(name="coating_material", data="SiO2")
coating_material.attrs["units"] = "NX_CHAR"
bend_y = mirror.create_dataset(name="bend_y", data="mibd")
bend_y.attrs["units"] = "NX_DIMENSIONLESS"
distance = mirror.create_dataset(
name="distance", data=-4370 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
# ── Beam stop 2 ────────────────────────────────────────────────────────
beam_stop_2 = instrument.create_group("beam_stop_2")
beam_stop_2.attrs["NX_class"] = "NXbeam_stop"
beam_stop_2.create_dataset("description", data="rectangular")
beam_stop_2.create_dataset("size_x", data=5.0).attrs["units"] = "mm"
beam_stop_2.create_dataset("size_y", data=2.25).attrs["units"] = "mm"
self._safe_dataset(beam_stop_2, "x", "bs2x", units="mm")
self._safe_dataset(beam_stop_2, "y", "bs2y", units="mm")
# TODO: diode (transmitted signal) when device is available
xbpm5 = instrument.create_group("XBPM5")
xbpm5.attrs["NX_class"] = "NXdetector"
xbpm5_sum = xbpm5.create_group("XBPM5_sum")
xbpm5_sum_data = xbpm5_sum.create_dataset(name="data", data=get_entry(data, "bpm5s"))
xbpm5_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm5_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
xbpm5_x = xbpm5.create_group("XBPM5_x")
xbpm5_x_data = xbpm5_x.create_dataset(name="data", data=get_entry(data, "bpm5x"))
xbpm5_x_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm5_x.create_dataset(
name="description", data="Normalized difference of counts between left and right quadrants."
)
xbpm5_y = xbpm5.create_group("XBPM5_y")
xbpm5_y_data = xbpm5_y.create_dataset(name="data", data=get_entry(data, "bpm5y"))
xbpm5_y_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm5_y.create_dataset(
name="description", data="Normalized difference of counts between high and low quadrants."
)
xbpm5_skew = xbpm5.create_group("XBPM5_skew")
xbpm5_skew_data = xbpm5_skew.create_dataset(name="data", data=get_entry(data, "bpm5z"))
xbpm5_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm5_skew.create_dataset(
name="description", data="Normalized difference of counts between diagonal quadrants."
)
slit_2 = instrument.create_group("slit_2")
slit_2.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="Ag")
source.create_dataset(name="description", data="Slit 2, optics hutch")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl2wh"))
x_gap.attrs["units"] = "mm"
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl2wv"))
y_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl2ch"))
x_translation.attrs["units"] = "mm"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl2cv"))
height.attrs["units"] = "mm"
distance = source.create_dataset(
name="distance", data=-3140 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
slit_3 = instrument.create_group("slit_3")
slit_3.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="Si")
source.create_dataset(name="description", data="Slit 3, experimental hutch, exposure box")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl3wh"))
x_gap.attrs["units"] = "mm"
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl3wv"))
y_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl3ch"))
x_translation.attrs["units"] = "mm"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl3cv"))
height.attrs["units"] = "mm"
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0))
# distance.attrs["units"] = "mm"
filter_set = instrument.create_group("filter_set")
filter_set.attrs["NX_class"] = "NXattenuator"
filter_set.create_dataset(name="material", data="Si")
filter_set.create_dataset(
name="description",
data="The filter set consists of 4 linear stages, each with five filter positions. Additionally, each one allows for an out position to allow 'no filtering'.",
)
attenuator_transmission = filter_set.create_dataset(
name="attenuator_transmission", data=10 ** get_entry(data, "ftrans", 0)
)
attenuator_transmission.attrs["units"] = "NX_DIMENSIONLESS"
slit_4 = instrument.create_group("slit_4")
slit_4.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="Si")
source.create_dataset(name="description", data="Slit 4, experimental hutch, exposure box")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl4wh"))
x_gap.attrs["units"] = "mm"
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl4wv"))
y_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl4ch"))
x_translation.attrs["units"] = "mm"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl4cv"))
height.attrs["units"] = "mm"
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0))
# distance.attrs["units"] = "mm"
slit_5 = instrument.create_group("slit_5")
slit_5.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="Si")
source.create_dataset(name="description", data="Slit 5, experimental hutch, exposure box")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl5wh"))
x_gap.attrs["units"] = "mm"
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl5wv"))
y_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl5ch"))
x_translation.attrs["units"] = "mm"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl5cv"))
height.attrs["units"] = "mm"
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0))
# distance.attrs["units"] = "mm"
beam_stop_1 = instrument.create_group("beam_stop_1")
beam_stop_1.attrs["NX_class"] = "NX_beamstop"
beam_stop_1.create_dataset(name="description", data="circular")
bms1_size = beam_stop_1.create_dataset(name="size", data=3)
bms1_size.attrs["units"] = "mm"
bms1_x = beam_stop_1.create_dataset(name="size", data=get_entry(data, "bs1x"))
bms1_x.attrs["units"] = "mm"
bms1_y = beam_stop_1.create_dataset(name="size", data=get_entry(data, "bs1y"))
bms1_y.attrs["units"] = "mm"
beam_stop_2 = instrument.create_group("beam_stop_2")
beam_stop_2.attrs["NX_class"] = "NX_beamstop"
beam_stop_2.create_dataset(name="description", data="rectangular")
bms2_size_x = beam_stop_2.create_dataset(name="size_x", data=5)
bms2_size_x.attrs["units"] = "mm"
bms2_size_y = beam_stop_2.create_dataset(name="size_y", data=2.25)
bms2_size_y.attrs["units"] = "mm"
bms2_x = beam_stop_2.create_dataset(name="size", data=get_entry(data, "bs2x"))
bms2_x.attrs["units"] = "mm"
bms2_y = beam_stop_2.create_dataset(name="size", data=get_entry(data, "bs2y"))
bms2_y.attrs["units"] = "mm"
bms2_data = beam_stop_2.create_dataset(name="data", data=get_entry(data, "diode"))
bms2_data.attrs["units"] = "NX_DIMENSIONLESS"
if "eiger1p5m" in device_manager.devices and device_manager.devices.eiger1p5m.enabled:
eiger_4 = instrument.create_group("eiger_4")
eiger_4.attrs["NX_class"] = "NXdetector"
x_pixel_size = eiger_4.create_dataset(name="x_pixel_size", data=75)
x_pixel_size.attrs["units"] = "um"
y_pixel_size = eiger_4.create_dataset(name="y_pixel_size", data=75)
y_pixel_size.attrs["units"] = "um"
polar_angle = eiger_4.create_dataset(name="polar_angle", data=0)
polar_angle.attrs["units"] = "degrees"
azimuthal_angle = eiger_4.create_dataset(name="azimuthal_angle", data=0)
azimuthal_angle.attrs["units"] = "degrees"
rotation_angle = eiger_4.create_dataset(name="rotation_angle", data=0)
rotation_angle.attrs["units"] = "degrees"
description = eiger_4.create_dataset(
name="description", data="Single-photon counting detector, 320 micron-thick Si chip"
)
orientation = eiger_4.create_group("orientation")
orientation.attrs["description"] = (
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
)
orientation.create_dataset(name="transpose", data=1)
orientation.create_dataset(name="rot90", data=3)
if (
"eiger9m" in device_manager.devices
and device_manager.devices.eiger9m.enabled
and "eiger9m" in file_references
):
eiger9m = instrument.create_group("eiger9m")
eiger9m.attrs["NX_class"] = "NXdetector"
x_pixel_size = eiger9m.create_dataset(name="x_pixel_size", data=75)
x_pixel_size.attrs["units"] = "um"
y_pixel_size = eiger9m.create_dataset(name="y_pixel_size", data=75)
y_pixel_size.attrs["units"] = "um"
polar_angle = eiger9m.create_dataset(name="polar_angle", data=0)
polar_angle.attrs["units"] = "degrees"
azimuthal_angle = eiger9m.create_dataset(name="azimuthal_angle", data=0)
azimuthal_angle.attrs["units"] = "degrees"
rotation_angle = eiger9m.create_dataset(name="rotation_angle", data=0)
rotation_angle.attrs["units"] = "degrees"
description = eiger9m.create_dataset(
name="description", data="Eiger9M detector, in-house developed, Paul Scherrer Institute"
)
orientation = eiger9m.create_group("orientation")
orientation.attrs["description"] = (
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
)
orientation.create_dataset(name="transpose", data=1)
orientation.create_dataset(name="rot90", data=3)
data = eiger9m.create_ext_link("data", file_references["eiger9m"]["path"], "EG9M/data")
status = eiger9m.create_ext_link(
"status", file_references["eiger9m"]["path"], "EG9M/status"
# ── Detector translation ───────────────────────────────────────────────
self._safe_dataset(
instrument, "detector_translation_x", "dettrx",
units="mm", description="Detector x-translation stage",
)
if (
"pilatus_2" in device_manager.devices
and device_manager.devices.pilatus_2.enabled
and "pilatus_2" in file_references
):
pilatus_2 = instrument.create_group("pilatus_2")
pilatus_2.attrs["NX_class"] = "NXdetector"
x_pixel_size = pilatus_2.create_dataset(name="x_pixel_size", data=172)
x_pixel_size.attrs["units"] = "um"
y_pixel_size = pilatus_2.create_dataset(name="y_pixel_size", data=172)
y_pixel_size.attrs["units"] = "um"
polar_angle = pilatus_2.create_dataset(name="polar_angle", data=0)
polar_angle.attrs["units"] = "degrees"
azimuthal_angle = pilatus_2.create_dataset(name="azimuthal_angle", data=0)
azimuthal_angle.attrs["units"] = "degrees"
rotation_angle = pilatus_2.create_dataset(name="rotation_angle", data=0)
rotation_angle.attrs["units"] = "degrees"
description = pilatus_2.create_dataset(
name="description", data="Pilatus 300K detector, Dectris, Switzerland"
)
orientation = pilatus_2.create_group("orientation")
orientation.attrs["description"] = (
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
)
orientation.create_dataset(name="transpose", data=1)
orientation.create_dataset(name="rot90", data=2)
data = pilatus_2.create_ext_link(
"data", file_references["pilatus_2"]["path"], "entry/instrument/pilatus_2/data"
)
# ── Eiger 1.5M detector ───────────────────────────────────────────────
if (
"eiger_1_5" in self.device_manager.devices
and self.device_manager.devices.eiger_1_5.enabled
and "eiger_1_5" in self.file_references
):
eiger = instrument.create_group("eiger_1_5")
eiger.attrs["NX_class"] = "NXdetector"
eiger.create_dataset("x_pixel_size", data=75.0).attrs["units"] = "um"
eiger.create_dataset("y_pixel_size", data=75.0).attrs["units"] = "um"
eiger.create_dataset("polar_angle", data=0.0).attrs["units"] = "degrees"
eiger.create_dataset("azimuthal_angle", data=0.0).attrs["units"] = "degrees"
eiger.create_dataset("rotation_angle", data=0.0).attrs["units"] = "degrees"
eiger.create_dataset(
"description",
data="Eiger 1.5M detector, in-house developed, Paul Scherrer Institute",
)
eiger.create_dataset(
"type",
data="Single-photon counting detector, 320 micron-thick Si chip",
)
orientation = eiger.create_group("orientation")
orientation.attrs["description"] = (
"Orientation defines the number of counterclockwise rotations by 90 deg "
"followed by a transposition to reach the 'cameraman orientation', "
"looking towards the beam."
)
orientation.create_dataset("transpose", data=1)
orientation.create_dataset("rot90", data=3)
# Soft-link recorded frame data from the BEC collection
try:
for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys():
self._safe_soft_link(eiger, k, f"{EIGER_COLL}/{k}")
except Exception:
pass
# External link to pixel mask in the Eiger master file
try:
eiger.create_ext_link(
"pixel_mask",
self.file_references["eiger_1_5"].file_path,
"/entry/instrument/detector/pixel_mask",
)
except Exception:
pass
if (
"falcon" in device_manager.devices
and device_manager.devices.falcon.enabled
and "falcon" in file_references
):
falcon = instrument.create_ext_link(
"falcon", file_references["falcon"]["path"], "entry/instrument/FalconX1"
)
# ── MCS (multi-channel scaler) ─────────────────────────────────────────
if (
"mcs" in self.device_manager.devices
and self.device_manager.devices.mcs.enabled
):
mcs_group = instrument.create_group("mcs")
mcs_group.attrs["NX_class"] = "NXdetector"
mcs_group.attrs["description"] = "MCS card cSAXS — multi-channel scaler"
self._safe_soft_link(mcs_group, "data", "/entry/collection/devices/mcs")
return storage
# ── flOMNI ────────────────────────────────────────────────────────────
# flomni_samples is used as the sentinel for the entire flOMNI setup.
# If it is absent from the device_manager (removed from config) the
# whole group is omitted. Individual datasets inside are still each
# guarded by _safe_dataset / _safe_soft_link in case a specific motor
# is temporarily disabled without removing the full setup.
if "flomni_samples" in self.device_manager.devices:
flomni = instrument.create_group("flOMNI")
flomni.attrs["NX_class"] = "NXpositioner"
flomni.attrs["description"] = "flOMNI flexible tOMography Nano Imaging"
# Galil motors — coarse sample stage
self._safe_dataset(flomni, "fsamx", "fsamx", units="mm", description="Sample coarse X")
self._safe_dataset(flomni, "fsamy", "fsamy", units="mm", description="Sample coarse Y")
self._safe_dataset(flomni, "fsamroy", "fsamroy", units="degrees", description="Sample rotation")
# Galil motors — sample transfer / tray
self._safe_dataset(flomni, "ftransx", "ftransx", units="mm", description="Sample transfer X")
self._safe_dataset(flomni, "ftransy", "ftransy", units="mm", description="Sample transfer Y")
self._safe_dataset(flomni, "ftransz", "ftransz", units="mm", description="Sample transfer Z")
self._safe_dataset(flomni, "ftray", "ftray", units="mm", description="Sample transfer tray")
# Galil motors — laser tracker
self._safe_dataset(flomni, "ftracky", "ftracky", units="mm", description="Laser tracker coarse Y")
self._safe_dataset(flomni, "ftrackz", "ftrackz", units="mm", description="Laser tracker coarse Z")
# Galil motors — X-ray eye
self._safe_dataset(flomni, "feyex", "feyex", units="mm", description="X-ray eye X")
self._safe_dataset(flomni, "feyey", "feyey", units="mm", description="X-ray eye Y")
# Galil motors — optics (zone plate)
self._safe_dataset(flomni, "foptx", "foptx", units="mm", description="Optics X")
self._safe_dataset(flomni, "fopty", "fopty", units="mm", description="Optics Y")
self._safe_dataset(flomni, "foptz", "foptz", units="mm", description="Optics Z")
# Galil motor — heater
self._safe_dataset(flomni, "fheater", "fheater", units="mm", description="Heater Y")
# Smaract motors — OSA (order-sorting aperture)
self._safe_dataset(flomni, "fosax", "fosax", units="mm", description="OSA X")
self._safe_dataset(flomni, "fosay", "fosay", units="mm", description="OSA Y")
self._safe_dataset(flomni, "fosaz", "fosaz", units="mm", description="OSA Z")
# Temperature and humidity sensor (soft link to BEC collection entry)
self._safe_soft_link(
flomni, "flomni_temphum",
"/entry/collection/devices/flomni_temphum",
)
# Real-time encoder positions (RtFlomniFlyer)
# Single soft link to the entire rt_positions folder in the BEC
# collection. This is the primary scan coordinate for ptychography.
self._safe_soft_link(
flomni, "rt_positions",
"/entry/collection/devices/rt_positions",
)

View File

@@ -105,11 +105,11 @@ class FlomniFermatScan(AsyncFlyScanBase):
def scan_report_instructions(self):
"""Scan report instructions for the progress bar"""
yield from self.stubs.scan_report_instruction({"device_progress": ["rt_flyer"]})
yield from self.stubs.scan_report_instruction({"device_progress": ["rt_positions"]})
@property
def monitor_sync(self) -> str:
return "rt_flyer"
return "rt_positions"
def initialize(self):
self.scan_motors = []
@@ -294,10 +294,10 @@ class FlomniFermatScan(AsyncFlyScanBase):
def scan_core(self):
# send off the flyer
yield from self.stubs.kickoff(device="rt_flyer")
yield from self.stubs.kickoff(device="rt_positions")
# start the readout loop of the flyer
status = yield from self.stubs.complete(device="rt_flyer", wait=False)
status = yield from self.stubs.complete(device="rt_positions", wait=False)
# read the monitors until the flyer is done
while not status.done:

View File

@@ -0,0 +1,69 @@
import pytest
from bec_server.device_server.tests.utils import DMMock
from csaxs_bec.devices.omny.shutter import MonitorSignal, OMNYFastShutter
@pytest.mark.parametrize("auto_monitor", [False, True])
def test_monitor_signal_stores_auto_monitor(auto_monitor):
signal = MonitorSignal(name="signal", auto_monitor=auto_monitor)
assert signal.auto_monitor is auto_monitor
def test_monitor_signal_put_propagates_value_to_readback_callback():
signal = MonitorSignal(name="signal", auto_monitor=True)
initial_value = signal.read()[signal.name]["value"]
callback_values = []
callback_reads = []
def _test_cb(value, old_value, **kwargs):
callback_values.append((value, old_value))
callback_reads.append(kwargs["obj"].read())
signal.subscribe(_test_cb, event_type=signal.SUB_VALUE, run=False)
signal.put(1)
assert callback_values == [(1, initial_value)]
assert len(callback_reads) == 1
assert callback_reads[0][signal.name]["value"] == 1
assert signal.read()[signal.name]["value"] == 1
signal.put(0)
assert callback_values == [(1, initial_value), (0, 1)]
assert len(callback_reads) == 2
assert callback_reads[1][signal.name]["value"] == 0
assert signal.read()[signal.name]["value"] == 0
@pytest.fixture
def omny_fast_shutter():
shutter = OMNYFastShutter(name="omny_fast_shutter", device_manager=DMMock())
try:
yield shutter
finally:
shutter.destroy()
def test_omny_fast_shutter_uses_monitor_signal_with_auto_monitor(omny_fast_shutter):
assert isinstance(omny_fast_shutter.shutter, MonitorSignal)
assert omny_fast_shutter.shutter.auto_monitor is True
def test_omny_fast_shutter_propagates_signal_changes_to_device_readback(omny_fast_shutter):
signal_name = omny_fast_shutter.shutter.name
callback_reads = []
def _test_cb(**kwargs):
callback_reads.append(omny_fast_shutter.read())
omny_fast_shutter.shutter.subscribe(_test_cb, event_type=omny_fast_shutter.shutter.SUB_VALUE, run=False)
omny_fast_shutter.shutter.put(1)
assert len(callback_reads) == 1
assert callback_reads[0][signal_name]["value"] == 1
assert omny_fast_shutter.read()[signal_name]["value"] == 1
assert omny_fast_shutter.fshstatus() == 1

View File

@@ -0,0 +1,241 @@
"""Module to test the pseudo_device module."""
import pytest
from bec_lib.atlas_models import Device
from ophyd_devices.sim.sim_signals import SetableSignal
from csaxs_bec.devices.pseudo_devices.bpm import BPM
from csaxs_bec.devices.pseudo_devices.bpm_control import _GAIN_TO_BITS, BPMControl
@pytest.fixture
def patched_dm(dm_with_devices):
# Patch missing current_session attribute in the device manager
dm = dm_with_devices
setattr(dm, "current_session", dm._session)
#
signal_lsb = SetableSignal(name="gain_lsb", value=0, kind="config")
signal_mid = SetableSignal(name="gain_mid", value=0, kind="config")
signal_msb = SetableSignal(name="gain_msb", value=0, kind="config")
signal_coupling = SetableSignal(name="coupling", value=0, kind="config")
signal_speed = SetableSignal(name="speed_mode", value=0, kind="config")
for signal in [signal_lsb, signal_mid, signal_msb, signal_coupling, signal_speed]:
dev_cfg = Device(
name=signal.name,
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
enabled=True,
readoutPriority="baseline",
)
dm._session["devices"].append(dev_cfg.model_dump())
dm.devices._add_device(signal.name, signal)
return dm
@pytest.fixture
def bpm_control(patched_dm):
name = "bpm_control"
control_config = Device(
name=name,
deviceClass="csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl",
enabled=True,
readoutPriority="baseline",
deviceConfig={
"gain_lsb": "gain_lsb",
"gain_mid": "gain_mid",
"gain_msb": "gain_msb",
"coupling": "coupling",
"speed_mode": "speed_mode",
},
needs=["gain_lsb", "gain_mid", "gain_msb", "coupling", "speed_mode"],
)
patched_dm._session["devices"].append(control_config.model_dump())
try:
control = BPMControl(
name=name,
gain_lsb="gain_lsb",
gain_mid="gain_mid",
gain_msb="gain_msb",
coupling="coupling",
speed_mode="speed_mode",
device_manager=patched_dm,
)
patched_dm.devices._add_device(control.name, control)
control.wait_for_connection()
yield control
finally:
control.destroy()
def test_bpm_control_set_gain(bpm_control):
gain_lsb = bpm_control.device_manager.devices["gain_lsb"]
gain_mid = bpm_control.device_manager.devices["gain_mid"]
gain_msb = bpm_control.device_manager.devices["gain_msb"]
coupling = bpm_control.device_manager.devices["coupling"]
speed_mode = bpm_control.device_manager.devices["speed_mode"]
gain_lsb.put(0)
gain_mid.put(0)
gain_msb.put(0)
coupling.put(0)
speed_mode.put(1)
gain = bpm_control.gain.get()
assert _GAIN_TO_BITS.get(gain) == (0, 0, 0, speed_mode.get() == 1)
gain_val = 10000000
bpm_control.set_gain(gain_val)
assert _GAIN_TO_BITS.get(gain_val, ()) == (
gain_msb.get(),
gain_mid.get(),
gain_lsb.get(),
speed_mode.get(),
)
gain_val = 100000000000
bpm_control.set_gain(gain_val)
assert _GAIN_TO_BITS.get(gain_val, ()) == (
gain_msb.get(),
gain_mid.get(),
gain_lsb.get(),
speed_mode.get(),
)
with pytest.raises(ValueError):
bpm_control.set_gain(1005.0)
def test_bpm_control_set_coupling(bpm_control):
coupling = bpm_control.device_manager.devices["coupling"]
coupling.put(0)
bpm_control.coupling.get() == "AC"
coupling.put(1)
bpm_control.coupling.get() == "DC"
bpm_control.set_coupling("AC")
assert coupling.get() == 0
with pytest.raises(ValueError):
bpm_control.set_coupling("wrong")
@pytest.fixture
def patched_dm_bpm(dm_with_devices):
# Patch missing current_session attribute in the device manager
dm = dm_with_devices
setattr(dm, "current_session", dm._session)
#
left_top = SetableSignal(name="left_top", value=0, kind="config")
right_top = SetableSignal(name="right_top", value=0, kind="config")
right_bot = SetableSignal(name="right_bot", value=0, kind="config")
left_bot = SetableSignal(name="left_bot", value=0, kind="config")
for signal in [left_top, right_top, right_bot, left_bot]:
dev_cfg = Device(
name=signal.name,
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
enabled=True,
readoutPriority="baseline",
)
dm._session["devices"].append(dev_cfg.model_dump())
dm.devices._add_device(signal.name, signal)
return dm
@pytest.fixture
def bpm(patched_dm_bpm):
name = "bpm"
bpm_config = Device(
name=name,
deviceClass="csaxs_bec.devices.pseudo_devices.bpm.BPM",
enabled=True,
readoutPriority="baseline",
deviceConfig={
"left_top": "left_top",
"right_top": "right_top",
"right_bot": "right_bot",
"left_bot": "left_bot",
},
needs=["left_top", "right_top", "right_bot", "left_bot"],
)
patched_dm_bpm._session["devices"].append(bpm_config.model_dump())
try:
bpm = BPM(
name=name,
left_top="left_top",
right_top="right_top",
right_bot="right_bot",
left_bot="left_bot",
device_manager=patched_dm_bpm,
)
patched_dm_bpm.devices._add_device(bpm.name, bpm)
bpm.wait_for_connection()
yield bpm
finally:
bpm.destroy()
def test_bpm_positions(bpm):
left_top = bpm.device_manager.devices["left_top"]
right_top = bpm.device_manager.devices["right_top"]
right_bot = bpm.device_manager.devices["right_bot"]
left_bot = bpm.device_manager.devices["left_bot"]
# Test center position
for signal in [left_top, right_top, right_bot, left_bot]:
signal.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
# Test fully left
left_top.put(1)
right_top.put(0)
right_bot.put(0)
left_bot.put(1)
assert bpm.pos_x.get() == -1
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 0
assert bpm.intensity.get() == 2
# Test fully right
left_top.put(0)
right_top.put(1)
right_bot.put(1)
left_bot.put(0)
assert bpm.pos_x.get() == 1
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 0
# Test fully top
left_top.put(1)
right_top.put(1)
right_bot.put(0)
left_bot.put(0)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 1
assert bpm.diagonal.get() == 0
# Test fully bottom
left_top.put(0)
right_top.put(0)
right_bot.put(1)
left_bot.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == -1
assert bpm.diagonal.get() == 0
# Diagonal beam
left_top.put(1)
right_top.put(0)
right_bot.put(1)
left_bot.put(0)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == -1
left_top.put(0)
right_top.put(1)
right_bot.put(0)
left_bot.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 1