csaxs_nexus_format #181

Merged
holler merged 4 commits from csaxs_nexus_format into main 2026-03-31 12:59:01 +02:00
4 changed files with 446 additions and 444 deletions
+15 -12
View File
@@ -68,18 +68,21 @@ ccmx:
- cSAXS
- optics
# ccm_energy:
# readoutPriority: baseline
# deviceClass: ophyd_devices.devices.simple_positioner.PSIPositionerBase
# prefix: "X12SA-OP-CCM1:"
# override_suffixes:
# user_readback: "ENERGY-GET"
# user_setpoint: "ENERGY-SET"
# velocity: "ROTY:VELO"
# deviceTags:
# - user motors
# enabled: true
# readOnly: false
# TODO: motion does not stop at end of movement. Issue with device class
ccm_energy:
readoutPriority: baseline
deviceClass: ophyd_devices.devices.simple_positioner.PSISimplePositioner
deviceConfig:
prefix: "X12SA-OP-CCM1:"
override_suffixes:
user_readback: "ENERGY-GET"
user_setpoint: "ENERGY-SET"
velocity: "ROTY.VELO"
motor_done_move: "ROTY.DMOV"
deviceTags:
- user motors
enabled: true
readOnly: false
+4 -4
View File
@@ -1,8 +1,8 @@
# This is the main configuration file that is
# commented or uncommented according to the type of experiment
# optics:
# - !include ./bl_optics_hutch.yaml
optics:
- !include ./bl_optics_hutch.yaml
# frontend:
# - !include ./bl_frontend.yaml
@@ -10,8 +10,8 @@
endstation:
- !include ./bl_endstation.yaml
# detectors:
# - !include ./bl_detectors.yaml
detectors:
- !include ./bl_detectors.yaml
#sastt:
# - !include ./sastt.yaml
@@ -534,6 +534,7 @@ omny_panda:
INENC4.VAL.Max: interf_st_rotx_max
INENC4.VAL.Mean: interf_st_rotx_mean
INENC4.VAL.Min: interf_st_rotx_min
PCAP.GATE_DURATION.Value: pcap_gate_duration_value
deviceTags:
- detector
enabled: true
+426 -428
View File
@@ -6,467 +6,465 @@ from bec_server.file_writer.default_writer import DefaultFormat
class cSAXSNeXusFormat(DefaultFormat):
"""
NeXus file format for cSAXS beamline. This format is based on the default NeXus format, but with some additional entries specific to the cSAXS beamline. The structure of the file is based on the NeXus standard, but with some additional groups and datasets specific to the cSAXS beamline.
NeXus file format for the cSAXS beamline (BEC era).
Mirrors the old SPEC layout.xml hierarchy and adds the flOMNI instrument
group for the nano-positioning stage used in ptychography.
Device resilience
-----------------
Every device read (self.get_entry / device call) is wrapped in try/except.
If a device is removed from the BEC config file between sessions it simply
disappears from the device_manager — the corresponding dataset or link is
silently omitted from the HDF5 file without raising an error. This means
the file structure is additive: re-add the device to the config and the
field reappears automatically on the next scan.
Top-level HDF5 structure
────────────────────────
/entry NXentry (definition = NXptycho)
/sample NXsample ← primary sample group
/entry_ptycho NXentry ← generic ptycho entry
/data_soft NXentry ← convenience Eiger frame links
/control NXmonitor
/instrument NXinstrument
/source
/insertion_device
/monochromator
/XBPM3
/slit_3 … slit_5
/filter_set
/beam_stop_1 … beam_stop_2
/eiger_1_5 NXdetector
/mcs NXdetector
/flOMNI NXpositioner
Device name mapping (old SPEC → current BEC)
────────────────────────────────────────────
samx / samy → samx / samy (generic; kept for non-flOMNI configs)
sl3wh/wv/ch/cv → sl3trxi/o/b/t (individual blade motors; gap/centre TODO)
sl4wh/wv/ch/cv → sl4trxi/o/b/t
sl5wh/wv/ch/cv → sl5trxi/o/b/t
bs1x / bs1y → bs1x / bs1y
bs2x / bs2y → bs2x / bs2y
dettrx → dettrx
eiger_4 → eiger_1_5
mcs → mcs
filter_array → filter_array_1_x … filter_array_4_x
xbpm3 → xbpm3x / xbpm3y (stage positions; signal readouts TODO)
energy → ccm_energy
TODO (devices not yet in BEC list)
───────────────────────────────────
curr, idgap ring current, undulator gap
moth1, mobd monochromator crystal angles
mith, mibd, mirror_coating mirror
bpm3s/x/y/z XBPM3 signal readouts
sl0 / sl1 / sl2 upstream optics-hutch slits
slit gap / centre derived from blade pairs + calibration offset
"""
# -------------------------------------------------------------------------
# Helpers
# -------------------------------------------------------------------------
def _safe_dataset(self, group, name: str, device: str,
units: str | None = None,
description: str | None = None) -> None:
"""
Write a dataset from the BEC scan data dictionary.
Silently skips if the device was not recorded in this scan
(e.g. removed from config, readoutPriority=on_request and not triggered,
or the scan finished before the device responded).
"""
try:
value = self.get_entry(device)
ds = group.create_dataset(name, data=value)
if units:
ds.attrs["units"] = units
if description:
ds.attrs["description"] = description
except Exception:
pass
def _safe_soft_link(self, group, name: str, target: str) -> None:
"""Create a soft link; silently skip on any error."""
try:
group.create_soft_link(name, target)
except Exception:
pass
def _slit_blades(self, group, prefix: str) -> None:
"""
Store individual blade motor positions for a 4-blade slit set.
Derived quantities (gap, centre) require a per-slit calibration offset
and will be added in a later update.
"""
for blade, motor in [
("inner_x", f"{prefix}trxi"),
("outer_x", f"{prefix}trxo"),
("bottom_y", f"{prefix}trxb"),
("top_y", f"{prefix}trxt"),
]:
self._safe_dataset(group, blade, motor, units="mm")
# -------------------------------------------------------------------------
# Main format method
# -------------------------------------------------------------------------
def format(self) -> None:
"""
Prepare the NeXus file format.
Override this method in file writer plugins to customize the HDF5 file format.
"""Build the NeXus/HDF5 layout for a cSAXS scan."""
The class provides access to the following attributes:
- self.storage: The HDF5Storage object.
- self.data: The data dictionary.
- self.file_references: The file references dictionary, which has the link to external data.
- self.device_manager: The DeviceManagerBase object.
- self.get_entry(name, default=None): Helper method to get an entry from the data dictionary.
# Canonical paths referenced by multiple groups
RT_POS_PATH = "/entry/instrument/flOMNI/rt_positions"
EIGER_COLL = "/entry/collection/file_references/eiger_1_5"
See also: :class:`bec_server.file_writer.file_writer.HDF5Storage`.
# ── Root entry ────────────────────────────────────────────────────────
entry = self.storage.create_group("entry")
entry.attrs["NX_class"] = "NXentry"
entry.attrs["definition"] = "NXptycho"
"""
# ── /entry/sample ─────────────────────────────────────────────────────
# Primary sample group. Contains the name of the mounted sample and a
# link to the real-time scan positions. Generic samx/samy are recorded
# here so the group is meaningful for non-flOMNI configurations too.
sample = entry.create_group("sample")
sample.attrs["NX_class"] = "NXsample"
# Soft-link name directly to the value BEC recorded in the collection.
# Only written when flomni_samples is present; other configs leave name absent.
if "flomni_samples" in self.device_manager.devices:
self._safe_soft_link(
sample, "name",
"/entry/collection/devices/flomni_samples"
"/flomni_samples_sample_names_sample0/value",
)
# Generic coarse stage positions (meaningful in non-flOMNI setups)
self._safe_dataset(sample, "x_translation", "samx", units="mm")
self._safe_dataset(sample, "y_translation", "samy", units="mm")
# Real-time encoder positions — the primary scan coordinate
self._safe_soft_link(sample, "positions", RT_POS_PATH)
# entry = self.storage.create_group("entry")
# ── /entry/entry_ptycho ───────────────────────────────────────────────
# Generic ptychography entry. Detector data and scan positions are
# linked in from the instrument groups so this entry is self-contained
# for downstream reconstruction codes.
entry_ptycho = entry.create_group("entry_ptycho")
entry_ptycho.attrs["NX_class"] = "NXentry"
entry_ptycho.attrs["definition"] = "NXptycho"
# # /entry/control
# control = entry.create_group("control")
# control.attrs["NX_class"] = "NXmonitor"
# control.create_dataset(name="mode", data="monitor")
nxdata = entry_ptycho.create_group("data")
nxdata.attrs["NX_class"] = "NXdata"
nxdata.attrs["signal"] = "data"
# Detector frames
try:
for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys():
self._safe_soft_link(nxdata, k, f"{EIGER_COLL}/{k}")
except Exception:
pass
# Scan positions
self._safe_soft_link(nxdata, "positions", RT_POS_PATH)
# #########
# # EXAMPLE for soft link
# #########
# # /entry/data
# if "eiger_4" in self.device_manager.devices:
# entry.create_soft_link(name="data", target="/entry/instrument/eiger_4")
# Link to the primary sample group
self._safe_soft_link(entry_ptycho, "sample", "/entry/sample")
# ########
# # EXAMPLE for external link
# ########
# # control = entry.create_group("sample")
# # control.create_ext_link("data", self.file_references["eiger9m"]["path"], "EG9M/data")
# ── /entry/data_soft ──────────────────────────────────────────────────
# Convenience group mirroring the old /entry/data hardlink from layout.xml.
data_soft = entry.create_group("data_soft")
data_soft.attrs["NX_class"] = "NXentry"
try:
for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys():
self._safe_soft_link(data_soft, k, f"{EIGER_COLL}/{k}")
except Exception:
pass
# # /entry/sample
# control = entry.create_group("sample")
# control.attrs["NX_class"] = "NXsample"
# control.create_dataset(name="name", data=self.data.get("samplename"))
# control.create_dataset(name="description", data=self.data.get("sample_description"))
# ── /entry/control ────────────────────────────────────────────────────
control = entry.create_group("control")
control.attrs["NX_class"] = "NXmonitor"
control.create_dataset("mode", data="monitor")
# TODO: beam intensity integral — add device when available
# self._safe_dataset(control, "integral", "bpm_sum", units="NX_DIMENSIONLESS")
# # /entry/instrument
# instrument = entry.create_group("instrument")
# instrument.attrs["NX_class"] = "NXinstrument"
# ── /entry/instrument ─────────────────────────────────────────────────
instrument = entry.create_group("instrument")
instrument.attrs["NX_class"] = "NXinstrument"
instrument.create_dataset("name", data="cSAXS beamline")
# source = instrument.create_group("source")
# source.attrs["NX_class"] = "NXsource"
# source.create_dataset(name="type", data="Synchrotron X-ray Source")
# source.create_dataset(name="name", data="Swiss Light Source")
# source.create_dataset(name="probe", data="x-ray")
# ── Source ────────────────────────────────────────────────────────────
# Numerical values are currently unknown and stored as 0.
# Will be updated once the corresponding devices are in BEC.
source = instrument.create_group("source")
source.attrs["NX_class"] = "NXsource"
source.create_dataset("type", data="Synchrotron X-ray Source")
source.create_dataset("name", data="Swiss Light Source")
source.create_dataset("probe", data="x-ray")
source.create_dataset("sigma_x", data=0.0).attrs["units"] = "mm"
source.create_dataset("sigma_y", data=0.0).attrs["units"] = "mm"
source.create_dataset("divergence_x", data=0.0).attrs["units"] = "radians"
source.create_dataset("divergence_y", data=0.0).attrs["units"] = "radians"
# TODO: current — add device when available
# self._safe_dataset(source, "current", "curr", units="mA")
# # /entry
# entry = self.storage.create_group("entry")
# entry.attrs["NX_class"] = "NXentry"
# entry.attrs["definition"] = "NXsas"
# entry.attrs["start_time"] = self.data.get("start_time")
# entry.attrs["end_time"] = self.data.get("end_time")
# entry.attrs["version"] = 1.0
# # /entry/control
# control = entry.create_group("control")
# control.attrs["NX_class"] = "NXmonitor"
# control.create_dataset(name="mode", data="monitor")
# control.create_dataset(name="integral", data=self.get_entry("bpm4i"))
# # /entry/data
# main_data = entry.create_group("data")
# main_data.attrs["NX_class"] = "NXdata"
# if "eiger_4" in self.device_manager.devices:
# main_data.create_soft_link(name="data", target="/entry/instrument/eiger_4/data")
# elif "eiger9m" in self.device_manager.devices:
# main_data.create_soft_link(name="data", target="/entry/instrument/eiger9m/data")
# elif "pilatus_2" in self.device_manager.devices:
# main_data.create_soft_link(name="data", target="/entry/instrument/pilatus_2/data")
# # /entry/sample
# control = entry.create_group("sample")
# control.attrs["NX_class"] = "NXsample"
# control.create_dataset(name="name", data=self.get_entry("samplename"))
# control.create_dataset(name="description", data=self.data.get("sample_description"))
# x_translation = control.create_dataset(name="x_translation", data=self.get_entry("samx"))
# x_translation.attrs["units"] = "mm"
# y_translation = control.create_dataset(name="y_translation", data=self.get_entry("samy"))
# y_translation.attrs["units"] = "mm"
# temperature_log = control.create_dataset(
# name="temperature_log", data=self.get_entry("temp")
# )
# temperature_log.attrs["units"] = "K"
# # /entry/instrument
# instrument = entry.create_group("instrument")
# instrument.attrs["NX_class"] = "NXinstrument"
# instrument.create_dataset(name="name", data="cSAXS beamline")
# source = instrument.create_group("source")
# source.attrs["NX_class"] = "NXsource"
# source.create_dataset(name="type", data="Synchrotron X-ray Source")
# source.create_dataset(name="name", data="Swiss Light Source")
# source.create_dataset(name="probe", data="x-ray")
# distance = source.create_dataset(
# name="distance", data=-33800 - np.asarray(self.get_entry("samz", 0))
# )
# distance.attrs["units"] = "mm"
# sigma_x = source.create_dataset(name="sigma_x", data=0.202)
# sigma_x.attrs["units"] = "mm"
# sigma_y = source.create_dataset(name="sigma_y", data=0.018)
# sigma_y.attrs["units"] = "mm"
# divergence_x = source.create_dataset(name="divergence_x", data=0.000135)
# divergence_x.attrs["units"] = "radians"
# divergence_y = source.create_dataset(name="divergence_y", data=0.000025)
# divergence_y.attrs["units"] = "radians"
# current = source.create_dataset(name="current", data=self.get_entry("curr"))
# current.attrs["units"] = "mA"
# insertion_device = instrument.create_group("insertion_device")
# insertion_device.attrs["NX_class"] = "NXinsertion_device"
# source.create_dataset(name="type", data="undulator")
# gap = source.create_dataset(name="gap", data=self.get_entry("idgap"))
# gap.attrs["units"] = "mm"
# k = source.create_dataset(name="k", data=2.46)
# k.attrs["units"] = "NX_DIMENSIONLESS"
# length = source.create_dataset(name="length", data=1820)
# length.attrs["units"] = "mm"
# slit_0 = instrument.create_group("slit_0")
# slit_0.attrs["NX_class"] = "NXslit"
# source.create_dataset(name="material", data="OFHC Cu")
# source.create_dataset(name="description", data="Horizontal secondary source slit")
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl0wh"))
# x_gap.attrs["units"] = "mm"
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl0ch"))
# x_translation.attrs["units"] = "mm"
# distance = source.create_dataset(
# name="distance", data=-21700 - np.asarray(self.get_entry("samz", 0))
# )
# distance.attrs["units"] = "mm"
# slit_1 = instrument.create_group("slit_1")
# slit_1.attrs["NX_class"] = "NXslit"
# source.create_dataset(name="material", data="OFHC Cu")
# source.create_dataset(name="description", data="Horizontal secondary source slit")
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl1wh"))
# x_gap.attrs["units"] = "mm"
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl1wv"))
# y_gap.attrs["units"] = "mm"
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl1ch"))
# x_translation.attrs["units"] = "mm"
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl1ch"))
# height.attrs["units"] = "mm"
# distance = source.create_dataset(
# name="distance", data=-7800 - np.asarray(self.get_entry("samz", 0))
# )
# distance.attrs["units"] = "mm"
# mono = instrument.create_group("monochromator")
# mono.attrs["NX_class"] = "NXmonochromator"
# mokev = self.data.get("mokev", {})
# if mokev:
# if isinstance(mokev, list):
# mokev = mokev[0]
# wavelength = mono.create_dataset(
# name="wavelength", data=12.3984193 / (mokev.get("mokev").get("value") + 1e-9)
# )
# wavelength.attrs["units"] = "Angstrom"
# energy = mono.create_dataset(name="energy", data=mokev.get("mokev").get("value"))
# energy.attrs["units"] = "keV"
# mono.create_dataset(name="type", data="Double crystal fixed exit monochromator.")
# distance = mono.create_dataset(
# name="distance", data=-5220 - np.asarray(self.get_entry("samz", 0))
# )
# distance.attrs["units"] = "mm"
# ── Insertion device ──────────────────────────────────────────────────
insertion_device = instrument.create_group("insertion_device")
insertion_device.attrs["NX_class"] = "NXinsertion_device"
insertion_device.create_dataset("type", data="undulator")
insertion_device.create_dataset("k", data=0.0)
insertion_device.create_dataset("length", data=0.0).attrs["units"] = "mm"
# TODO: gap — add device when available
# self._safe_dataset(insertion_device, "gap", "idgap", units="mm")
# ── Monochromator ─────────────────────────────────────────────────────
# ccm_energy is a baseline device and is recorded in the scan data.
mono = instrument.create_group("monochromator")
mono.attrs["NX_class"] = "NXmonochromator"
mono.create_dataset("type", data="Double crystal fixed exit monochromator.")
try:
energy_kev = self.get_entry("ccm_energy")
energy_arr = np.asarray(energy_kev, dtype=float)
en_ds = mono.create_dataset("energy", data=energy_arr)
en_ds.attrs["units"] = "keV"
with np.errstate(divide="ignore", invalid="ignore"):
wavelength = np.where(energy_arr != 0, 12.3984193 / energy_arr, 0.0)
wl_ds = mono.create_dataset("wavelength", data=wavelength)
wl_ds.attrs["units"] = "Angstrom"
except Exception:
pass
# TODO: crystal angles — add moth1 / mobd when available
# crystal_1 = mono.create_group("crystal_1")
# crystal_1.attrs["NX_class"] = "NXcrystal"
# crystal_1.create_dataset(name="usage", data="Bragg")
# crystal_1.create_dataset(name="order_no", data="1")
# crystal_1.create_dataset(name="reflection", data="[1 1 1]")
# bragg_angle = crystal_1.create_dataset(name="bragg_angle", data=self.get_entry("moth1"))
# bragg_angle.attrs["units"] = "degrees"
# crystal_1.create_dataset("usage", data="Bragg")
# crystal_1.create_dataset("type", data="Si")
# crystal_1.create_dataset("order_no", data=1.0)
# crystal_1.create_dataset("reflection", data="[1 1 1]")
# self._safe_dataset(crystal_1, "bragg_angle", "moth1", units="degrees")
# crystal_2 = mono.create_group("crystal_2")
# crystal_2.attrs["NX_class"] = "NXcrystal"
# crystal_2.create_dataset(name="usage", data="Bragg")
# crystal_2.create_dataset(name="order_no", data="2")
# crystal_2.create_dataset(name="reflection", data="[1 1 1]")
# bragg_angle = crystal_2.create_dataset(name="bragg_angle", data=self.get_entry("moth1"))
# bragg_angle.attrs["units"] = "degrees"
# bend_x = crystal_2.create_dataset(name="bend_x", data=self.get_entry("mobd"))
# bend_x.attrs["units"] = "degrees"
# xbpm4 = instrument.create_group("XBPM4")
# xbpm4.attrs["NX_class"] = "NXdetector"
# xbpm4_sum = xbpm4.create_group("XBPM4_sum")
# xbpm4_sum_data = xbpm4_sum.create_dataset(name="data", data=self.get_entry("bpm4s"))
# xbpm4_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm4_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
# xbpm4_x = xbpm4.create_group("XBPM4_x")
# xbpm4_x_data = xbpm4_x.create_dataset(name="data", data=self.get_entry("bpm4x"))
# xbpm4_x_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm4_x.create_dataset(
# name="description",
# data="Normalized difference of counts between left and right quadrants.",
# )
# xbpm4_y = xbpm4.create_group("XBPM4_y")
# xbpm4_y_data = xbpm4_y.create_dataset(name="data", data=self.get_entry("bpm4y"))
# xbpm4_y_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm4_y.create_dataset(
# name="description",
# data="Normalized difference of counts between high and low quadrants.",
# )
# xbpm4_skew = xbpm4.create_group("XBPM4_skew")
# xbpm4_skew_data = xbpm4_skew.create_dataset(name="data", data=self.get_entry("bpm4z"))
# xbpm4_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm4_skew.create_dataset(
# name="description", data="Normalized difference of counts between diagonal quadrants."
# )
# crystal_2.create_dataset("usage", data="Bragg")
# crystal_2.create_dataset("type", data="Si")
# crystal_2.create_dataset("order_no", data=2.0)
# crystal_2.create_dataset("reflection", data="[1 1 1]")
# self._safe_dataset(crystal_2, "bragg_angle", "moth1", units="degrees")
# self._safe_dataset(crystal_2, "bend_x", "mobd", units="degrees")
# ── Mirror ────────────────────────────────────────────────────────────
# TODO: mith, mibd, mirror_coating not yet in device list
# mirror = instrument.create_group("mirror")
# mirror.attrs["NX_class"] = "NXmirror"
# mirror.create_dataset(name="type", data="single")
# mirror.create_dataset("type", data="single")
# mirror.create_dataset(
# name="description",
# data="Grazing incidence mirror to reject high-harmonic wavelengths from the monochromator. There are three coating options available that are used depending on the X-ray energy, no coating (SiO2), rhodium (Rh) or platinum (Pt).",
# )
# incident_angle = mirror.create_dataset(name="incident_angle", data=self.get_entry("mith"))
# incident_angle.attrs["units"] = "degrees"
# substrate_material = mirror.create_dataset(name="substrate_material", data="SiO2")
# substrate_material.attrs["units"] = "NX_CHAR"
# coating_material = mirror.create_dataset(name="coating_material", data="SiO2")
# coating_material.attrs["units"] = "NX_CHAR"
# bend_y = mirror.create_dataset(name="bend_y", data="mibd")
# bend_y.attrs["units"] = "NX_DIMENSIONLESS"
# distance = mirror.create_dataset(
# name="distance", data=-4370 - np.asarray(self.get_entry("samz", 0))
# )
# distance.attrs["units"] = "mm"
# xbpm5 = instrument.create_group("XBPM5")
# xbpm5.attrs["NX_class"] = "NXdetector"
# xbpm5_sum = xbpm5.create_group("XBPM5_sum")
# xbpm5_sum_data = xbpm5_sum.create_dataset(name="data", data=self.get_entry("bpm5s"))
# xbpm5_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm5_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
# xbpm5_x = xbpm5.create_group("XBPM5_x")
# xbpm5_x_data = xbpm5_x.create_dataset(name="data", data=self.get_entry("bpm5x"))
# xbpm5_x_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm5_x.create_dataset(
# name="description",
# data="Normalized difference of counts between left and right quadrants.",
# )
# xbpm5_y = xbpm5.create_group("XBPM5_y")
# xbpm5_y_data = xbpm5_y.create_dataset(name="data", data=self.get_entry("bpm5y"))
# xbpm5_y_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm5_y.create_dataset(
# name="description",
# data="Normalized difference of counts between high and low quadrants.",
# )
# xbpm5_skew = xbpm5.create_group("XBPM5_skew")
# xbpm5_skew_data = xbpm5_skew.create_dataset(name="data", data=self.get_entry("bpm5z"))
# xbpm5_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm5_skew.create_dataset(
# name="description", data="Normalized difference of counts between diagonal quadrants."
# "description",
# data=(
# "Grazing incidence mirror to reject high-harmonic wavelengths. "
# "Three coating options: no coating (SiO2), rhodium (Rh), platinum (Pt)."
# ),
# )
# mirror.create_dataset("substrate_material", data="SiO2")
# self._safe_dataset(mirror, "incident_angle", "mith", units="degrees")
# self._safe_dataset(mirror, "coating_material", "mirror_coating", units="NX_CHAR")
# self._safe_dataset(mirror, "bend_y", "mibd", units="NX_DIMENSIONLESS")
# ── Upstream slits (optics hutch) ─────────────────────────────────────
# TODO: slit_0 / slit_1 / slit_2 motors not yet in BEC device list
# slit_0 = instrument.create_group("slit_0")
# ...
# slit_1 = instrument.create_group("slit_1")
# ...
# slit_2 = instrument.create_group("slit_2")
# slit_2.attrs["NX_class"] = "NXslit"
# source.create_dataset(name="material", data="Ag")
# source.create_dataset(name="description", data="Slit 2, optics hutch")
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl2wh"))
# x_gap.attrs["units"] = "mm"
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl2wv"))
# y_gap.attrs["units"] = "mm"
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl2ch"))
# x_translation.attrs["units"] = "mm"
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl2cv"))
# height.attrs["units"] = "mm"
# distance = source.create_dataset(
# name="distance", data=-3140 - np.asarray(self.get_entry("samz", 0))
# )
# distance.attrs["units"] = "mm"
# ...
# slit_3 = instrument.create_group("slit_3")
# slit_3.attrs["NX_class"] = "NXslit"
# source.create_dataset(name="material", data="Si")
# source.create_dataset(name="description", data="Slit 3, experimental hutch, exposure box")
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl3wh"))
# x_gap.attrs["units"] = "mm"
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl3wv"))
# y_gap.attrs["units"] = "mm"
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl3ch"))
# x_translation.attrs["units"] = "mm"
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl3cv"))
# height.attrs["units"] = "mm"
# # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0))
# # distance.attrs["units"] = "mm"
# ── XBPM3 ─────────────────────────────────────────────────────────────
# xbpm3x/xbpm3y are stage motor positions for aligning the monitor.
# Signal readouts (sum/x/y/skew) are TODO once MCS channels are mapped.
xbpm3 = instrument.create_group("XBPM3")
xbpm3.attrs["NX_class"] = "NXdetector"
xbpm3.attrs["description"] = "X-ray beam position monitor 3, experimental hutch"
self._safe_dataset(xbpm3, "x_stage", "xbpm3x", units="mm",
description="XBPM3 stage x-translation")
self._safe_dataset(xbpm3, "y_stage", "xbpm3y", units="mm",
description="XBPM3 stage y-translation")
# TODO: signal readout sub-groups once MCS channels are configured
# for suffix, entry_name, desc in [
# ("sum", "bpm3s", "Sum of counts for the four quadrants."),
# ("x", "bpm3x", "Normalized diff, left vs right quadrants."),
# ("y", "bpm3y", "Normalized diff, high vs low quadrants."),
# ("skew", "bpm3z", "Normalized diff, diagonal quadrants."),
# ]:
# g = xbpm3.create_group(f"XBPM3_{suffix}")
# self._safe_dataset(g, "data", entry_name, units="NX_DIMENSIONLESS")
# g.create_dataset("description", data=desc)
# filter_set = instrument.create_group("filter_set")
# filter_set.attrs["NX_class"] = "NXattenuator"
# filter_set.create_dataset(name="material", data="Si")
# filter_set.create_dataset(
# name="description",
# data="The filter set consists of 4 linear stages, each with five filter positions. Additionally, each one allows for an out position to allow 'no filtering'.",
# )
# attenuator_transmission = filter_set.create_dataset(
# name="attenuator_transmission", data=10 ** self.get_entry("ftrans", 0)
# )
# attenuator_transmission.attrs["units"] = "NX_DIMENSIONLESS"
# ── Slit 3 (experimental hutch, exposure box) ─────────────────────────
slit_3 = instrument.create_group("slit_3")
slit_3.attrs["NX_class"] = "NXslit"
slit_3.create_dataset("material", data="Si")
slit_3.create_dataset("description", data="Slit 3, experimental hutch, exposure box")
# TODO: gap / centre require per-slit calibration offset — add later
self._slit_blades(slit_3, "sl3")
# slit_4 = instrument.create_group("slit_4")
# slit_4.attrs["NX_class"] = "NXslit"
# source.create_dataset(name="material", data="Si")
# source.create_dataset(name="description", data="Slit 4, experimental hutch, exposure box")
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl4wh"))
# x_gap.attrs["units"] = "mm"
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl4wv"))
# y_gap.attrs["units"] = "mm"
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl4ch"))
# x_translation.attrs["units"] = "mm"
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl4cv"))
# height.attrs["units"] = "mm"
# # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0))
# # distance.attrs["units"] = "mm"
# ── Filter set ────────────────────────────────────────────────────────
filter_set = instrument.create_group("filter_set")
filter_set.attrs["NX_class"] = "NXattenuator"
filter_set.create_dataset("material", data="Si")
filter_set.create_dataset(
"description",
data=(
"Four linear filter stages (filter_array_1_x … filter_array_4_x). "
"Each stage has five filter positions plus an 'out' position."
),
)
for i in range(1, 5):
self._safe_dataset(filter_set, f"stage_{i}_x",
f"filter_array_{i}_x", units="mm")
# TODO: attenuator_transmission = 10^(ftrans) once device is available
# slit_5 = instrument.create_group("slit_5")
# slit_5.attrs["NX_class"] = "NXslit"
# source.create_dataset(name="material", data="Si")
# source.create_dataset(name="description", data="Slit 5, experimental hutch, exposure box")
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl5wh"))
# x_gap.attrs["units"] = "mm"
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl5wv"))
# y_gap.attrs["units"] = "mm"
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl5ch"))
# x_translation.attrs["units"] = "mm"
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl5cv"))
# height.attrs["units"] = "mm"
# # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0))
# # distance.attrs["units"] = "mm"
# ── Slit 4 (experimental hutch, exposure box) ─────────────────────────
slit_4 = instrument.create_group("slit_4")
slit_4.attrs["NX_class"] = "NXslit"
slit_4.create_dataset("material", data="Ge")
slit_4.create_dataset("description", data="Slit 4, experimental hutch, exposure box")
self._slit_blades(slit_4, "sl4")
# beam_stop_1 = instrument.create_group("beam_stop_1")
# beam_stop_1.attrs["NX_class"] = "NX_beamstop"
# beam_stop_1.create_dataset(name="description", data="circular")
# bms1_size = beam_stop_1.create_dataset(name="size", data=3)
# bms1_size.attrs["units"] = "mm"
# bms1_x = beam_stop_1.create_dataset(name="size", data=self.get_entry("bs1x"))
# bms1_x.attrs["units"] = "mm"
# bms1_y = beam_stop_1.create_dataset(name="size", data=self.get_entry("bs1y"))
# bms1_y.attrs["units"] = "mm"
# ── Slit 5 (experimental hutch, exposure box) ─────────────────────────
slit_5 = instrument.create_group("slit_5")
slit_5.attrs["NX_class"] = "NXslit"
slit_5.create_dataset("material", data="Si")
slit_5.create_dataset("description", data="Slit 5, experimental hutch, exposure box")
self._slit_blades(slit_5, "sl5")
# beam_stop_2 = instrument.create_group("beam_stop_2")
# beam_stop_2.attrs["NX_class"] = "NX_beamstop"
# beam_stop_2.create_dataset(name="description", data="rectangular")
# bms2_size_x = beam_stop_2.create_dataset(name="size_x", data=5)
# bms2_size_x.attrs["units"] = "mm"
# bms2_size_y = beam_stop_2.create_dataset(name="size_y", data=2.25)
# bms2_size_y.attrs["units"] = "mm"
# bms2_x = beam_stop_2.create_dataset(name="size", data=self.get_entry("bs2x"))
# bms2_x.attrs["units"] = "mm"
# bms2_y = beam_stop_2.create_dataset(name="size", data=self.get_entry("bs2y"))
# bms2_y.attrs["units"] = "mm"
# bms2_data = beam_stop_2.create_dataset(name="data", data=self.get_entry("diode"))
# bms2_data.attrs["units"] = "NX_DIMENSIONLESS"
# ── Beam stop 1 ────────────────────────────────────────────────────────
beam_stop_1 = instrument.create_group("beam_stop_1")
beam_stop_1.attrs["NX_class"] = "NXbeam_stop"
beam_stop_1.create_dataset("description", data="circular")
beam_stop_1.create_dataset("size", data=3.0).attrs["units"] = "mm"
self._safe_dataset(beam_stop_1, "x", "bs1x", units="mm")
self._safe_dataset(beam_stop_1, "y", "bs1y", units="mm")
# TODO: diode signal behind beam stop 1 when device is available
# if (
# "eiger1p5m" in self.device_manager.devices
# and self.device_manager.devices.eiger1p5m.enabled
# ):
# eiger_4 = instrument.create_group("eiger_4")
# eiger_4.attrs["NX_class"] = "NXdetector"
# x_pixel_size = eiger_4.create_dataset(name="x_pixel_size", data=75)
# x_pixel_size.attrs["units"] = "um"
# y_pixel_size = eiger_4.create_dataset(name="y_pixel_size", data=75)
# y_pixel_size.attrs["units"] = "um"
# polar_angle = eiger_4.create_dataset(name="polar_angle", data=0)
# polar_angle.attrs["units"] = "degrees"
# azimuthal_angle = eiger_4.create_dataset(name="azimuthal_angle", data=0)
# azimuthal_angle.attrs["units"] = "degrees"
# rotation_angle = eiger_4.create_dataset(name="rotation_angle", data=0)
# rotation_angle.attrs["units"] = "degrees"
# description = eiger_4.create_dataset(
# name="description", data="Single-photon counting detector, 320 micron-thick Si chip"
# )
# orientation = eiger_4.create_group("orientation")
# orientation.attrs["description"] = (
# "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
# )
# orientation.create_dataset(name="transpose", data=1)
# orientation.create_dataset(name="rot90", data=3)
# ── Beam stop 2 ────────────────────────────────────────────────────────
beam_stop_2 = instrument.create_group("beam_stop_2")
beam_stop_2.attrs["NX_class"] = "NXbeam_stop"
beam_stop_2.create_dataset("description", data="rectangular")
beam_stop_2.create_dataset("size_x", data=5.0).attrs["units"] = "mm"
beam_stop_2.create_dataset("size_y", data=2.25).attrs["units"] = "mm"
self._safe_dataset(beam_stop_2, "x", "bs2x", units="mm")
self._safe_dataset(beam_stop_2, "y", "bs2y", units="mm")
# TODO: diode (transmitted signal) when device is available
# if (
# "eiger9m" in self.device_manager.devices
# and self.device_manager.devices.eiger9m.enabled
# and "eiger9m" in self.file_references
# ):
# eiger9m = instrument.create_group("eiger9m")
# eiger9m.attrs["NX_class"] = "NXdetector"
# x_pixel_size = eiger9m.create_dataset(name="x_pixel_size", data=75)
# x_pixel_size.attrs["units"] = "um"
# y_pixel_size = eiger9m.create_dataset(name="y_pixel_size", data=75)
# y_pixel_size.attrs["units"] = "um"
# polar_angle = eiger9m.create_dataset(name="polar_angle", data=0)
# polar_angle.attrs["units"] = "degrees"
# azimuthal_angle = eiger9m.create_dataset(name="azimuthal_angle", data=0)
# azimuthal_angle.attrs["units"] = "degrees"
# rotation_angle = eiger9m.create_dataset(name="rotation_angle", data=0)
# rotation_angle.attrs["units"] = "degrees"
# description = eiger9m.create_dataset(
# name="description",
# data="Eiger9M detector, in-house developed, Paul Scherrer Institute",
# )
# orientation = eiger9m.create_group("orientation")
# orientation.attrs["description"] = (
# "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
# )
# orientation.create_dataset(name="transpose", data=1)
# orientation.create_dataset(name="rot90", data=3)
# data = eiger9m.create_ext_link(
# "data", self.file_references["eiger9m"]["path"], "EG9M/data"
# )
# status = eiger9m.create_ext_link(
# "status", self.file_references["eiger9m"]["path"], "EG9M/status"
# )
# ── Detector translation ───────────────────────────────────────────────
self._safe_dataset(
instrument, "detector_translation_x", "dettrx",
units="mm", description="Detector x-translation stage",
)
# if (
# "pilatus_2" in self.device_manager.devices
# and self.device_manager.devices.pilatus_2.enabled
# and "pilatus_2" in self.file_references
# ):
# pilatus_2 = instrument.create_group("pilatus_2")
# pilatus_2.attrs["NX_class"] = "NXdetector"
# x_pixel_size = pilatus_2.create_dataset(name="x_pixel_size", data=172)
# x_pixel_size.attrs["units"] = "um"
# y_pixel_size = pilatus_2.create_dataset(name="y_pixel_size", data=172)
# y_pixel_size.attrs["units"] = "um"
# polar_angle = pilatus_2.create_dataset(name="polar_angle", data=0)
# polar_angle.attrs["units"] = "degrees"
# azimuthal_angle = pilatus_2.create_dataset(name="azimuthal_angle", data=0)
# azimuthal_angle.attrs["units"] = "degrees"
# rotation_angle = pilatus_2.create_dataset(name="rotation_angle", data=0)
# rotation_angle.attrs["units"] = "degrees"
# description = pilatus_2.create_dataset(
# name="description", data="Pilatus 300K detector, Dectris, Switzerland"
# )
# orientation = pilatus_2.create_group("orientation")
# orientation.attrs["description"] = (
# "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
# )
# orientation.create_dataset(name="transpose", data=1)
# orientation.create_dataset(name="rot90", data=2)
# data = pilatus_2.create_ext_link(
# "data", self.file_references["pilatus_2"]["path"], "entry/instrument/pilatus_2/data"
# )
# ── Eiger 1.5M detector ───────────────────────────────────────────────
if (
"eiger_1_5" in self.device_manager.devices
and self.device_manager.devices.eiger_1_5.enabled
and "eiger_1_5" in self.file_references
):
eiger = instrument.create_group("eiger_1_5")
eiger.attrs["NX_class"] = "NXdetector"
eiger.create_dataset("x_pixel_size", data=75.0).attrs["units"] = "um"
eiger.create_dataset("y_pixel_size", data=75.0).attrs["units"] = "um"
eiger.create_dataset("polar_angle", data=0.0).attrs["units"] = "degrees"
eiger.create_dataset("azimuthal_angle", data=0.0).attrs["units"] = "degrees"
eiger.create_dataset("rotation_angle", data=0.0).attrs["units"] = "degrees"
eiger.create_dataset(
"description",
data="Eiger 1.5M detector, in-house developed, Paul Scherrer Institute",
)
eiger.create_dataset(
"type",
data="Single-photon counting detector, 320 micron-thick Si chip",
)
orientation = eiger.create_group("orientation")
orientation.attrs["description"] = (
"Orientation defines the number of counterclockwise rotations by 90 deg "
"followed by a transposition to reach the 'cameraman orientation', "
"looking towards the beam."
)
orientation.create_dataset("transpose", data=1)
orientation.create_dataset("rot90", data=3)
# Soft-link recorded frame data from the BEC collection
try:
for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys():
self._safe_soft_link(eiger, k, f"{EIGER_COLL}/{k}")
except Exception:
pass
# External link to pixel mask in the Eiger master file
try:
eiger.create_ext_link(
"pixel_mask",
self.file_references["eiger_1_5"].file_path,
"/entry/instrument/detector/pixel_mask",
)
except Exception:
pass
# if (
# "falcon" in self.device_manager.devices
# and self.device_manager.devices.falcon.enabled
# and "falcon" in self.file_references
# ):
# falcon = instrument.create_ext_link(
# "falcon", self.file_references["falcon"]["path"], "entry/instrument/FalconX1"
# )
# ── MCS (multi-channel scaler) ─────────────────────────────────────────
if (
"mcs" in self.device_manager.devices
and self.device_manager.devices.mcs.enabled
):
mcs_group = instrument.create_group("mcs")
mcs_group.attrs["NX_class"] = "NXdetector"
mcs_group.attrs["description"] = "MCS card cSAXS — multi-channel scaler"
self._safe_soft_link(mcs_group, "data", "/entry/collection/devices/mcs")
# ── flOMNI ────────────────────────────────────────────────────────────
# flomni_samples is used as the sentinel for the entire flOMNI setup.
# If it is absent from the device_manager (removed from config) the
# whole group is omitted. Individual datasets inside are still each
# guarded by _safe_dataset / _safe_soft_link in case a specific motor
# is temporarily disabled without removing the full setup.
if "flomni_samples" in self.device_manager.devices:
flomni = instrument.create_group("flOMNI")
flomni.attrs["NX_class"] = "NXpositioner"
flomni.attrs["description"] = "flOMNI flexible tOMography Nano Imaging"
# Galil motors — coarse sample stage
self._safe_dataset(flomni, "fsamx", "fsamx", units="mm", description="Sample coarse X")
self._safe_dataset(flomni, "fsamy", "fsamy", units="mm", description="Sample coarse Y")
self._safe_dataset(flomni, "fsamroy", "fsamroy", units="degrees", description="Sample rotation")
# Galil motors — sample transfer / tray
self._safe_dataset(flomni, "ftransx", "ftransx", units="mm", description="Sample transfer X")
self._safe_dataset(flomni, "ftransy", "ftransy", units="mm", description="Sample transfer Y")
self._safe_dataset(flomni, "ftransz", "ftransz", units="mm", description="Sample transfer Z")
self._safe_dataset(flomni, "ftray", "ftray", units="mm", description="Sample transfer tray")
# Galil motors — laser tracker
self._safe_dataset(flomni, "ftracky", "ftracky", units="mm", description="Laser tracker coarse Y")
self._safe_dataset(flomni, "ftrackz", "ftrackz", units="mm", description="Laser tracker coarse Z")
# Galil motors — X-ray eye
self._safe_dataset(flomni, "feyex", "feyex", units="mm", description="X-ray eye X")
self._safe_dataset(flomni, "feyey", "feyey", units="mm", description="X-ray eye Y")
# Galil motors — optics (zone plate)
self._safe_dataset(flomni, "foptx", "foptx", units="mm", description="Optics X")
self._safe_dataset(flomni, "fopty", "fopty", units="mm", description="Optics Y")
self._safe_dataset(flomni, "foptz", "foptz", units="mm", description="Optics Z")
# Galil motor — heater
self._safe_dataset(flomni, "fheater", "fheater", units="mm", description="Heater Y")
# Smaract motors — OSA (order-sorting aperture)
self._safe_dataset(flomni, "fosax", "fosax", units="mm", description="OSA X")
self._safe_dataset(flomni, "fosay", "fosay", units="mm", description="OSA Y")
self._safe_dataset(flomni, "fosaz", "fosaz", units="mm", description="OSA Z")
# Temperature and humidity sensor (soft link to BEC collection entry)
self._safe_soft_link(
flomni, "flomni_temphum",
"/entry/collection/devices/flomni_temphum",
)
# Real-time encoder positions (RtFlomniFlyer)
# Single soft link to the entire rt_positions folder in the BEC
# collection. This is the primary scan coordinate for ptychography.
self._safe_soft_link(
flomni, "rt_positions",
"/entry/collection/devices/rt_positions",
)