From be508cf300ea9e205a35e87f8be5610be1ea837d Mon Sep 17 00:00:00 2001 From: x12sa Date: Mon, 30 Mar 2026 16:48:47 +0200 Subject: [PATCH 1/4] first version with working entries --- csaxs_bec/file_writer/csaxs_nexus.py | 47 ++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/csaxs_bec/file_writer/csaxs_nexus.py b/csaxs_bec/file_writer/csaxs_nexus.py index 1e9b84b..abfee67 100644 --- a/csaxs_bec/file_writer/csaxs_nexus.py +++ b/csaxs_bec/file_writer/csaxs_nexus.py @@ -24,7 +24,54 @@ class cSAXSNeXusFormat(DefaultFormat): See also: :class:`bec_server.file_writer.file_writer.HDF5Storage`. """ + entry = self.storage.create_group("entry") + # ------------------------------------------------------------------ + # Create ptychography entry + # ------------------------------------------------------------------ + entry_ptycho = entry.create_group("entry_ptycho") + entry_ptycho.attrs["NX_class"] = "NXentry" + entry_ptycho.attrs["definition"] = "NXptycho" + # ------------------------------------------------------------------ + # NXdata group + # ------------------------------------------------------------------ + nxdata = entry_ptycho.create_group("data") + nxdata.attrs["NX_class"] = "NXdata" + nxdata.attrs["signal"] = "data" + + # ------------------------------------------------------------------ + # Link Eiger data (external) + # ------------------------------------------------------------------ + data = entry.create_group("data") + data.attrs["NX_class"] = "NXentry" + # Either ext link! + #data.create_ext_link("data", self.file_references["eiger_1_5"].file_path, "entry/data") + # Or soft link to already linked path + data_soft = entry.create_group("data_soft") + data_soft.attrs["NX_class"] = "NXentry" + for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys(): + data_soft.create_soft_link(k, f"/entry/collection/file_references/eiger_1_5/{k}") + + # ------------------------------------------------------------------ + # Link positions (soft links) + # ------------------------------------------------------------------ + positions_path = "/entry/collection/devices/rt_positions" + + try: + nxdata.create_soft_link("positions", positions_path) + except Exception: + pass + + # ------------------------------------------------------------------ + # Sample group + # ------------------------------------------------------------------ + sample = entry_ptycho.create_group("sample") + sample.attrs["NX_class"] = "NXsample" + + try: + sample.create_soft_link("positions", positions_path) + except Exception: + pass # entry = self.storage.create_group("entry") # # /entry/control -- 2.52.0 From bf045dadf139f661449137f5401d52f5809f0457 Mon Sep 17 00:00:00 2001 From: x12sa Date: Mon, 30 Mar 2026 17:13:45 +0200 Subject: [PATCH 2/4] first version from claude --- csaxs_bec/file_writer/csaxs_nexus.py | 1368 +++++++++++++++++--------- 1 file changed, 916 insertions(+), 452 deletions(-) diff --git a/csaxs_bec/file_writer/csaxs_nexus.py b/csaxs_bec/file_writer/csaxs_nexus.py index abfee67..30861f7 100644 --- a/csaxs_bec/file_writer/csaxs_nexus.py +++ b/csaxs_bec/file_writer/csaxs_nexus.py @@ -6,514 +6,978 @@ from bec_server.file_writer.default_writer import DefaultFormat class cSAXSNeXusFormat(DefaultFormat): """ - NeXus file format for cSAXS beamline. This format is based on the default NeXus format, but with some additional entries specific to the cSAXS beamline. The structure of the file is based on the NeXus standard, but with some additional groups and datasets specific to the cSAXS beamline. + NeXus file format for cSAXS / OMNYfluoX beamline (BEC era). + + Structure mirrors the old SPEC layout.xml hierarchy as closely as possible, + adapted for: + - The current BEC device list (see device_manager.devices) + - NXptycho definition (ptychography mode) + - Soft/external links to Eiger 1.5M data files + + Devices not yet available in the current setup are left as commented-out + TODO blocks so they can be enabled incrementally. + + Old SPEC name → Current BEC device + ───────────────────────────────────── + samx / samy → fsamx / fsamy (Flomni sample stages) + sl3wh/wv/ch/cv → sl3trxi/o/b/t (individual blade motors) + sl4wh/wv/ch/cv → sl4trxi/o/b/t + sl5wh/wv/ch/cv → sl5trxi/o/b/t + bs1x / bs1y → bs1x / bs1y + bs2x / bs2y → bs2x / bs2y + dettrx → dettrx + eiger_4 → eiger_1_5 + rt_positions → rt_positions (RtFlomniFlyer — real-time encoder positions) + mcs → mcs (MCSCardCSAXS) + filter_array → filter_array_1_x … 4_x + xbpm3 → xbpm3x / xbpm3y (stage positions, not signal readouts) + ───────────────────────────────────── + TODO devices (not yet in BEC list): + curr, idgap, mokev, moth1, mobd (source / monochromator) + mith, mibd, mirror_coating (mirror) + bpm4s/x/y/z, bpm5s/x/y/z (XBPM signal readouts) + sl0/1/2 motors (upstream slits) """ + # ------------------------------------------------------------------------- + # Helpers + # ------------------------------------------------------------------------- + def _safe_dataset(self, group, name: str, device: str, units: str | None = None, + description: str | None = None): + """Write a dataset from scan data; silently skip if the device was not recorded.""" + try: + value = self.get_entry(device) + ds = group.create_dataset(name, data=value) + if units: + ds.attrs["units"] = units + if description: + ds.attrs["description"] = description + except Exception: + pass + + def _slit_blades(self, group, prefix: str): + """ + Store individual blade positions for a 4-blade slit. + Gap and centre can be derived in post-processing: + x_gap = outer_x − inner_x + y_gap = top_y − bottom_y + x_centre = (outer_x + inner_x) / 2 + y_centre = (top_y + bottom_y) / 2 + """ + for blade, motor in [ + ("inner_x", f"{prefix}trxi"), + ("outer_x", f"{prefix}trxo"), + ("bottom_y", f"{prefix}trxb"), + ("top_y", f"{prefix}trxt"), + ]: + self._safe_dataset(group, blade, motor, units="mm") + + # ------------------------------------------------------------------------- + # Main format method + # ------------------------------------------------------------------------- def format(self) -> None: """ - Prepare the NeXus file format. - Override this method in file writer plugins to customize the HDF5 file format. - - The class provides access to the following attributes: - - self.storage: The HDF5Storage object. - - self.data: The data dictionary. - - self.file_references: The file references dictionary, which has the link to external data. - - self.device_manager: The DeviceManagerBase object. - - self.get_entry(name, default=None): Helper method to get an entry from the data dictionary. - - See also: :class:`bec_server.file_writer.file_writer.HDF5Storage`. + Build the NeXus/HDF5 layout. + Top-level structure + ─────────────────── + /entry + /entry_ptycho [NXentry / NXptycho] ← ptychography data + positions + /data_soft [NXentry] ← soft links to Eiger frames + /control [NXmonitor] + /instrument [NXinstrument] ← mirrors old layout.xml + /source + /insertion_device + /monochromator (TODO) + /mirror (TODO) + /XBPM3 + /slit_3 … slit_5 + /filter_set + /beam_stop_1 … 2 + /eiger_1_5 [NXdetector] + /mcs [NXdetector] + /flomni_stage [NXpositioner] + /rt_positions [NXpositioner] ← real-time encoder positions (primary) """ + + # ── Root entry ─────────────────────────────────────────────────────── entry = self.storage.create_group("entry") - # ------------------------------------------------------------------ - # Create ptychography entry - # ------------------------------------------------------------------ + entry.attrs["NX_class"] = "NXentry" + entry.attrs["definition"] = "NXptycho" + + # ── Ptychography entry ─────────────────────────────────────────────── entry_ptycho = entry.create_group("entry_ptycho") entry_ptycho.attrs["NX_class"] = "NXentry" entry_ptycho.attrs["definition"] = "NXptycho" - # ------------------------------------------------------------------ - # NXdata group - # ------------------------------------------------------------------ + # NXdata — primary signal is detector frames; positions index the scan nxdata = entry_ptycho.create_group("data") nxdata.attrs["NX_class"] = "NXdata" nxdata.attrs["signal"] = "data" - # ------------------------------------------------------------------ - # Link Eiger data (external) - # ------------------------------------------------------------------ - data = entry.create_group("data") - data.attrs["NX_class"] = "NXentry" - # Either ext link! - #data.create_ext_link("data", self.file_references["eiger_1_5"].file_path, "entry/data") - # Or soft link to already linked path - data_soft = entry.create_group("data_soft") - data_soft.attrs["NX_class"] = "NXentry" - for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys(): - data_soft.create_soft_link(k, f"/entry/collection/file_references/eiger_1_5/{k}") - - # ------------------------------------------------------------------ - # Link positions (soft links) - # ------------------------------------------------------------------ - positions_path = "/entry/collection/devices/rt_positions" + # Canonical path for rt_positions — soft-linked from both entry_ptycho + # and the flomni_stage instrument group below. + rt_positions_instrument_path = "/entry/instrument/flomni_stage/rt_positions" + # Link Eiger frames into NXdata try: - nxdata.create_soft_link("positions", positions_path) + for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys(): + nxdata.create_soft_link(k, f"/entry/collection/file_references/eiger_1_5/{k}") except Exception: pass - # ------------------------------------------------------------------ - # Sample group - # ------------------------------------------------------------------ + try: + nxdata.create_soft_link("positions", rt_positions_instrument_path) + except Exception: + pass + + # NXsample — mirrors old layout.xml /entry/sample sample = entry_ptycho.create_group("sample") sample.attrs["NX_class"] = "NXsample" - try: - sample.create_soft_link("positions", positions_path) + sample.create_dataset("name", data=self.data.get("samplename", "")) + except Exception: + pass + try: + sample.create_dataset("description", data=self.data.get("sample_description", "")) + except Exception: + pass + self._safe_dataset(sample, "x_coarse_stage", "fsamx", units="mm") + self._safe_dataset(sample, "y_coarse_stage", "fsamy", units="mm") + # TODO: temperature_log — add device when available + # self._safe_dataset(sample, "temperature_log", "temp", units="K") + try: + sample.create_soft_link("positions", rt_positions_instrument_path) except Exception: pass - # entry = self.storage.create_group("entry") - # # /entry/control - # control = entry.create_group("control") - # control.attrs["NX_class"] = "NXmonitor" - # control.create_dataset(name="mode", data="monitor") + # ── Soft links to Eiger data (convenience, mirrors old hardlink /entry/data) ── + data_soft = entry.create_group("data_soft") + data_soft.attrs["NX_class"] = "NXentry" + try: + for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys(): + data_soft.create_soft_link(k, f"/entry/collection/file_references/eiger_1_5/{k}") + except Exception: + pass - # ######### - # # EXAMPLE for soft link - # ######### - # # /entry/data - # if "eiger_4" in self.device_manager.devices: - # entry.create_soft_link(name="data", target="/entry/instrument/eiger_4") + # ── Control (beam monitor) ──────────────────────────────────────────── + control = entry.create_group("control") + control.attrs["NX_class"] = "NXmonitor" + control.create_dataset("mode", data="monitor") + # TODO: beam intensity monitor integral — add device when available + # self._safe_dataset(control, "integral", "bpm_sum", units="NX_DIMENSIONLESS") - # ######## - # # EXAMPLE for external link - # ######## - # # control = entry.create_group("sample") - # # control.create_ext_link("data", self.file_references["eiger9m"]["path"], "EG9M/data") + # ── Instrument ──────────────────────────────────────────────────────── + instrument = entry.create_group("instrument") + instrument.attrs["NX_class"] = "NXinstrument" + instrument.create_dataset("name", data="cSAXS beamline / OMNYfluoX") - # # /entry/sample - # control = entry.create_group("sample") - # control.attrs["NX_class"] = "NXsample" - # control.create_dataset(name="name", data=self.data.get("samplename")) - # control.create_dataset(name="description", data=self.data.get("sample_description")) + # ── Source ──────────────────────────────────────────────────────────── + source = instrument.create_group("source") + source.attrs["NX_class"] = "NXsource" + source.create_dataset("type", data="Synchrotron X-ray Source") + source.create_dataset("name", data="Swiss Light Source") + source.create_dataset("probe", data="x-ray") + source.create_dataset("sigma_x", data=0.202).attrs["units"] = "mm" + source.create_dataset("sigma_y", data=0.018).attrs["units"] = "mm" + source.create_dataset("divergence_x", data=0.000135).attrs["units"] = "radians" + source.create_dataset("divergence_y", data=0.000025).attrs["units"] = "radians" + # TODO: ring current — add device when available + # self._safe_dataset(source, "current", "curr", units="mA") - # # /entry/instrument - # instrument = entry.create_group("instrument") - # instrument.attrs["NX_class"] = "NXinstrument" - - # source = instrument.create_group("source") - # source.attrs["NX_class"] = "NXsource" - # source.create_dataset(name="type", data="Synchrotron X-ray Source") - # source.create_dataset(name="name", data="Swiss Light Source") - # source.create_dataset(name="probe", data="x-ray") - - # # /entry - # entry = self.storage.create_group("entry") - # entry.attrs["NX_class"] = "NXentry" - # entry.attrs["definition"] = "NXsas" - # entry.attrs["start_time"] = self.data.get("start_time") - # entry.attrs["end_time"] = self.data.get("end_time") - # entry.attrs["version"] = 1.0 - - # # /entry/control - # control = entry.create_group("control") - # control.attrs["NX_class"] = "NXmonitor" - # control.create_dataset(name="mode", data="monitor") - # control.create_dataset(name="integral", data=self.get_entry("bpm4i")) - - # # /entry/data - # main_data = entry.create_group("data") - # main_data.attrs["NX_class"] = "NXdata" - # if "eiger_4" in self.device_manager.devices: - # main_data.create_soft_link(name="data", target="/entry/instrument/eiger_4/data") - # elif "eiger9m" in self.device_manager.devices: - # main_data.create_soft_link(name="data", target="/entry/instrument/eiger9m/data") - # elif "pilatus_2" in self.device_manager.devices: - # main_data.create_soft_link(name="data", target="/entry/instrument/pilatus_2/data") - - # # /entry/sample - # control = entry.create_group("sample") - # control.attrs["NX_class"] = "NXsample" - # control.create_dataset(name="name", data=self.get_entry("samplename")) - # control.create_dataset(name="description", data=self.data.get("sample_description")) - # x_translation = control.create_dataset(name="x_translation", data=self.get_entry("samx")) - # x_translation.attrs["units"] = "mm" - # y_translation = control.create_dataset(name="y_translation", data=self.get_entry("samy")) - # y_translation.attrs["units"] = "mm" - # temperature_log = control.create_dataset( - # name="temperature_log", data=self.get_entry("temp") - # ) - # temperature_log.attrs["units"] = "K" - - # # /entry/instrument - # instrument = entry.create_group("instrument") - # instrument.attrs["NX_class"] = "NXinstrument" - # instrument.create_dataset(name="name", data="cSAXS beamline") - - # source = instrument.create_group("source") - # source.attrs["NX_class"] = "NXsource" - # source.create_dataset(name="type", data="Synchrotron X-ray Source") - # source.create_dataset(name="name", data="Swiss Light Source") - # source.create_dataset(name="probe", data="x-ray") - # distance = source.create_dataset( - # name="distance", data=-33800 - np.asarray(self.get_entry("samz", 0)) - # ) - # distance.attrs["units"] = "mm" - # sigma_x = source.create_dataset(name="sigma_x", data=0.202) - # sigma_x.attrs["units"] = "mm" - # sigma_y = source.create_dataset(name="sigma_y", data=0.018) - # sigma_y.attrs["units"] = "mm" - # divergence_x = source.create_dataset(name="divergence_x", data=0.000135) - # divergence_x.attrs["units"] = "radians" - # divergence_y = source.create_dataset(name="divergence_y", data=0.000025) - # divergence_y.attrs["units"] = "radians" - # current = source.create_dataset(name="current", data=self.get_entry("curr")) - # current.attrs["units"] = "mA" - - # insertion_device = instrument.create_group("insertion_device") - # insertion_device.attrs["NX_class"] = "NXinsertion_device" - # source.create_dataset(name="type", data="undulator") - # gap = source.create_dataset(name="gap", data=self.get_entry("idgap")) - # gap.attrs["units"] = "mm" - # k = source.create_dataset(name="k", data=2.46) - # k.attrs["units"] = "NX_DIMENSIONLESS" - # length = source.create_dataset(name="length", data=1820) - # length.attrs["units"] = "mm" - - # slit_0 = instrument.create_group("slit_0") - # slit_0.attrs["NX_class"] = "NXslit" - # source.create_dataset(name="material", data="OFHC Cu") - # source.create_dataset(name="description", data="Horizontal secondary source slit") - # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl0wh")) - # x_gap.attrs["units"] = "mm" - # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl0ch")) - # x_translation.attrs["units"] = "mm" - # distance = source.create_dataset( - # name="distance", data=-21700 - np.asarray(self.get_entry("samz", 0)) - # ) - # distance.attrs["units"] = "mm" - - # slit_1 = instrument.create_group("slit_1") - # slit_1.attrs["NX_class"] = "NXslit" - # source.create_dataset(name="material", data="OFHC Cu") - # source.create_dataset(name="description", data="Horizontal secondary source slit") - # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl1wh")) - # x_gap.attrs["units"] = "mm" - # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl1wv")) - # y_gap.attrs["units"] = "mm" - # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl1ch")) - # x_translation.attrs["units"] = "mm" - # height = source.create_dataset(name="x_translation", data=self.get_entry("sl1ch")) - # height.attrs["units"] = "mm" - # distance = source.create_dataset( - # name="distance", data=-7800 - np.asarray(self.get_entry("samz", 0)) - # ) - # distance.attrs["units"] = "mm" + # ── Insertion device ───────────────────────────────────────────────── + insertion_device = instrument.create_group("insertion_device") + insertion_device.attrs["NX_class"] = "NXinsertion_device" + insertion_device.create_dataset("type", data="undulator") + insertion_device.create_dataset("k", data=2.46) + insertion_device.create_dataset("length", data=1820.0).attrs["units"] = "mm" + # TODO: undulator gap — add device when available + # self._safe_dataset(insertion_device, "gap", "idgap", units="mm") + # ── Monochromator ──────────────────────────────────────────────────── + # TODO: mokev, moth1, mobd not yet in device list # mono = instrument.create_group("monochromator") # mono.attrs["NX_class"] = "NXmonochromator" - # mokev = self.data.get("mokev", {}) - # if mokev: - # if isinstance(mokev, list): - # mokev = mokev[0] + # mono.create_dataset("type", data="Double crystal fixed exit monochromator.") + # try: + # mokev_val = self.get_entry("mokev") # wavelength = mono.create_dataset( - # name="wavelength", data=12.3984193 / (mokev.get("mokev").get("value") + 1e-9) + # "wavelength", data=12.3984193 / (np.asarray(mokev_val) + 1e-9) # ) # wavelength.attrs["units"] = "Angstrom" - # energy = mono.create_dataset(name="energy", data=mokev.get("mokev").get("value")) + # energy = mono.create_dataset("energy", data=mokev_val) # energy.attrs["units"] = "keV" - # mono.create_dataset(name="type", data="Double crystal fixed exit monochromator.") - # distance = mono.create_dataset( - # name="distance", data=-5220 - np.asarray(self.get_entry("samz", 0)) - # ) - # distance.attrs["units"] = "mm" - + # except Exception: + # pass # crystal_1 = mono.create_group("crystal_1") # crystal_1.attrs["NX_class"] = "NXcrystal" - # crystal_1.create_dataset(name="usage", data="Bragg") - # crystal_1.create_dataset(name="order_no", data="1") - # crystal_1.create_dataset(name="reflection", data="[1 1 1]") - # bragg_angle = crystal_1.create_dataset(name="bragg_angle", data=self.get_entry("moth1")) - # bragg_angle.attrs["units"] = "degrees" - + # crystal_1.create_dataset("usage", data="Bragg") + # crystal_1.create_dataset("type", data="Si") + # crystal_1.create_dataset("order_no", data=1.0) + # crystal_1.create_dataset("reflection", data="[1 1 1]") + # self._safe_dataset(crystal_1, "bragg_angle", "moth1", units="degrees") # crystal_2 = mono.create_group("crystal_2") # crystal_2.attrs["NX_class"] = "NXcrystal" - # crystal_2.create_dataset(name="usage", data="Bragg") - # crystal_2.create_dataset(name="order_no", data="2") - # crystal_2.create_dataset(name="reflection", data="[1 1 1]") - # bragg_angle = crystal_2.create_dataset(name="bragg_angle", data=self.get_entry("moth1")) - # bragg_angle.attrs["units"] = "degrees" - # bend_x = crystal_2.create_dataset(name="bend_x", data=self.get_entry("mobd")) - # bend_x.attrs["units"] = "degrees" - - # xbpm4 = instrument.create_group("XBPM4") - # xbpm4.attrs["NX_class"] = "NXdetector" - # xbpm4_sum = xbpm4.create_group("XBPM4_sum") - # xbpm4_sum_data = xbpm4_sum.create_dataset(name="data", data=self.get_entry("bpm4s")) - # xbpm4_sum_data.attrs["units"] = "NX_DIMENSIONLESS" - # xbpm4_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.") - # xbpm4_x = xbpm4.create_group("XBPM4_x") - # xbpm4_x_data = xbpm4_x.create_dataset(name="data", data=self.get_entry("bpm4x")) - # xbpm4_x_data.attrs["units"] = "NX_DIMENSIONLESS" - # xbpm4_x.create_dataset( - # name="description", - # data="Normalized difference of counts between left and right quadrants.", - # ) - # xbpm4_y = xbpm4.create_group("XBPM4_y") - # xbpm4_y_data = xbpm4_y.create_dataset(name="data", data=self.get_entry("bpm4y")) - # xbpm4_y_data.attrs["units"] = "NX_DIMENSIONLESS" - # xbpm4_y.create_dataset( - # name="description", - # data="Normalized difference of counts between high and low quadrants.", - # ) - # xbpm4_skew = xbpm4.create_group("XBPM4_skew") - # xbpm4_skew_data = xbpm4_skew.create_dataset(name="data", data=self.get_entry("bpm4z")) - # xbpm4_skew_data.attrs["units"] = "NX_DIMENSIONLESS" - # xbpm4_skew.create_dataset( - # name="description", data="Normalized difference of counts between diagonal quadrants." - # ) + # crystal_2.create_dataset("usage", data="Bragg") + # crystal_2.create_dataset("type", data="Si") + # crystal_2.create_dataset("order_no", data=2.0) + # crystal_2.create_dataset("reflection", data="[1 1 1]") + # self._safe_dataset(crystal_2, "bragg_angle", "moth1", units="degrees") + # self._safe_dataset(crystal_2, "bend_x", "mobd", units="degrees") + # ── Mirror ──────────────────────────────────────────────────────────── + # TODO: mith, mibd, mirror_coating not yet in device list # mirror = instrument.create_group("mirror") # mirror.attrs["NX_class"] = "NXmirror" - # mirror.create_dataset(name="type", data="single") + # mirror.create_dataset("type", data="single") # mirror.create_dataset( - # name="description", - # data="Grazing incidence mirror to reject high-harmonic wavelengths from the monochromator. There are three coating options available that are used depending on the X-ray energy, no coating (SiO2), rhodium (Rh) or platinum (Pt).", - # ) - # incident_angle = mirror.create_dataset(name="incident_angle", data=self.get_entry("mith")) - # incident_angle.attrs["units"] = "degrees" - # substrate_material = mirror.create_dataset(name="substrate_material", data="SiO2") - # substrate_material.attrs["units"] = "NX_CHAR" - # coating_material = mirror.create_dataset(name="coating_material", data="SiO2") - # coating_material.attrs["units"] = "NX_CHAR" - # bend_y = mirror.create_dataset(name="bend_y", data="mibd") - # bend_y.attrs["units"] = "NX_DIMENSIONLESS" - # distance = mirror.create_dataset( - # name="distance", data=-4370 - np.asarray(self.get_entry("samz", 0)) - # ) - # distance.attrs["units"] = "mm" - - # xbpm5 = instrument.create_group("XBPM5") - # xbpm5.attrs["NX_class"] = "NXdetector" - # xbpm5_sum = xbpm5.create_group("XBPM5_sum") - # xbpm5_sum_data = xbpm5_sum.create_dataset(name="data", data=self.get_entry("bpm5s")) - # xbpm5_sum_data.attrs["units"] = "NX_DIMENSIONLESS" - # xbpm5_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.") - # xbpm5_x = xbpm5.create_group("XBPM5_x") - # xbpm5_x_data = xbpm5_x.create_dataset(name="data", data=self.get_entry("bpm5x")) - # xbpm5_x_data.attrs["units"] = "NX_DIMENSIONLESS" - # xbpm5_x.create_dataset( - # name="description", - # data="Normalized difference of counts between left and right quadrants.", - # ) - # xbpm5_y = xbpm5.create_group("XBPM5_y") - # xbpm5_y_data = xbpm5_y.create_dataset(name="data", data=self.get_entry("bpm5y")) - # xbpm5_y_data.attrs["units"] = "NX_DIMENSIONLESS" - # xbpm5_y.create_dataset( - # name="description", - # data="Normalized difference of counts between high and low quadrants.", - # ) - # xbpm5_skew = xbpm5.create_group("XBPM5_skew") - # xbpm5_skew_data = xbpm5_skew.create_dataset(name="data", data=self.get_entry("bpm5z")) - # xbpm5_skew_data.attrs["units"] = "NX_DIMENSIONLESS" - # xbpm5_skew.create_dataset( - # name="description", data="Normalized difference of counts between diagonal quadrants." + # "description", + # data=( + # "Grazing incidence mirror to reject high-harmonic wavelengths. " + # "Three coating options depending on X-ray energy: " + # "no coating (SiO2), rhodium (Rh) or platinum (Pt)." + # ), # ) + # mirror.create_dataset("substrate_material", data="SiO2") + # self._safe_dataset(mirror, "incident_angle", "mith", units="degrees") + # self._safe_dataset(mirror, "coating_material", "mirror_coating", units="NX_CHAR") + # self._safe_dataset(mirror, "bend_y", "mibd", units="NX_DIMENSIONLESS") + # ── Upstream slits (optics hutch) ──────────────────────────────────── + # TODO: slit_0 / slit_1 / slit_2 devices not yet in device list + # slit_0 = instrument.create_group("slit_0") + # slit_0.attrs["NX_class"] = "NXslit" + # slit_0.create_dataset("material", data="OFHC Cu") + # slit_0.create_dataset("description", data="Horizontal secondary source slit") + # self._safe_dataset(slit_0, "x_gap", "sl0wh", units="mm") + # self._safe_dataset(slit_0, "x_translation", "sl0ch", units="mm") + # + # slit_1 = instrument.create_group("slit_1") + # slit_1.attrs["NX_class"] = "NXslit" + # slit_1.create_dataset("material", data="OFHC Cu") + # slit_1.create_dataset("description", data="Slit 1, optics hutch") + # self._safe_dataset(slit_1, "x_gap", "sl1wh", units="mm") + # self._safe_dataset(slit_1, "y_gap", "sl1wv", units="mm") + # self._safe_dataset(slit_1, "x_translation", "sl1ch", units="mm") + # self._safe_dataset(slit_1, "height", "sl1cv", units="mm") + # # slit_2 = instrument.create_group("slit_2") # slit_2.attrs["NX_class"] = "NXslit" - # source.create_dataset(name="material", data="Ag") - # source.create_dataset(name="description", data="Slit 2, optics hutch") - # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl2wh")) - # x_gap.attrs["units"] = "mm" - # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl2wv")) - # y_gap.attrs["units"] = "mm" - # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl2ch")) - # x_translation.attrs["units"] = "mm" - # height = source.create_dataset(name="x_translation", data=self.get_entry("sl2cv")) - # height.attrs["units"] = "mm" - # distance = source.create_dataset( - # name="distance", data=-3140 - np.asarray(self.get_entry("samz", 0)) - # ) - # distance.attrs["units"] = "mm" + # slit_2.create_dataset("material", data="Ag") + # slit_2.create_dataset("description", data="Slit 2, optics hutch") + # self._safe_dataset(slit_2, "x_gap", "sl2wh", units="mm") + # self._safe_dataset(slit_2, "y_gap", "sl2wv", units="mm") + # self._safe_dataset(slit_2, "x_translation", "sl2ch", units="mm") + # self._safe_dataset(slit_2, "height", "sl2cv", units="mm") - # slit_3 = instrument.create_group("slit_3") - # slit_3.attrs["NX_class"] = "NXslit" - # source.create_dataset(name="material", data="Si") - # source.create_dataset(name="description", data="Slit 3, experimental hutch, exposure box") - # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl3wh")) - # x_gap.attrs["units"] = "mm" - # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl3wv")) - # y_gap.attrs["units"] = "mm" - # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl3ch")) - # x_translation.attrs["units"] = "mm" - # height = source.create_dataset(name="x_translation", data=self.get_entry("sl3cv")) - # height.attrs["units"] = "mm" - # # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0)) - # # distance.attrs["units"] = "mm" + # ── XBPM3 (experimental hutch beam position monitor) ───────────────── + # Note: xbpm3x/xbpm3y are the *stage* motor positions used to align + # the monitor, not the signal readout channels (sum/x/y/skew). + # Signal readouts are TODO once MCS channels are mapped. + xbpm3 = instrument.create_group("XBPM3") + xbpm3.attrs["NX_class"] = "NXdetector" + xbpm3.attrs["description"] = "X-ray beam position monitor 3, experimental hutch" + self._safe_dataset(xbpm3, "x_stage", "xbpm3x", units="mm", + description="XBPM3 stage x-translation") + self._safe_dataset(xbpm3, "y_stage", "xbpm3y", units="mm", + description="XBPM3 stage y-translation") + # TODO: add signal readout sub-groups once MCS channels are configured + # for suffix, entry_name, desc in [ + # ("sum", "bpm3s", "Sum of counts for the four quadrants."), + # ("x", "bpm3x", "Normalized difference of counts between left and right quadrants."), + # ("y", "bpm3y", "Normalized difference of counts between high and low quadrants."), + # ("skew", "bpm3z", "Normalized difference of counts between diagonal quadrants."), + # ]: + # g = xbpm3.create_group(f"XBPM3_{suffix}") + # self._safe_dataset(g, "data", entry_name, units="NX_DIMENSIONLESS") + # g.create_dataset("description", data=desc) - # filter_set = instrument.create_group("filter_set") - # filter_set.attrs["NX_class"] = "NXattenuator" - # filter_set.create_dataset(name="material", data="Si") - # filter_set.create_dataset( - # name="description", - # data="The filter set consists of 4 linear stages, each with five filter positions. Additionally, each one allows for an out position to allow 'no filtering'.", - # ) - # attenuator_transmission = filter_set.create_dataset( - # name="attenuator_transmission", data=10 ** self.get_entry("ftrans", 0) - # ) - # attenuator_transmission.attrs["units"] = "NX_DIMENSIONLESS" + # ── Slit 3 (experimental hutch, exposure box) ───────────────────────── + slit_3 = instrument.create_group("slit_3") + slit_3.attrs["NX_class"] = "NXslit" + slit_3.create_dataset("material", data="Si") + slit_3.create_dataset("description", data="Slit 3, experimental hutch, exposure box") + slit_3.attrs["blade_note"] = ( + "Individual blade positions stored. " + "x_gap = outer_x - inner_x, y_gap = top_y - bottom_y" + ) + self._slit_blades(slit_3, "sl3") - # slit_4 = instrument.create_group("slit_4") - # slit_4.attrs["NX_class"] = "NXslit" - # source.create_dataset(name="material", data="Si") - # source.create_dataset(name="description", data="Slit 4, experimental hutch, exposure box") - # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl4wh")) - # x_gap.attrs["units"] = "mm" - # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl4wv")) - # y_gap.attrs["units"] = "mm" - # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl4ch")) - # x_translation.attrs["units"] = "mm" - # height = source.create_dataset(name="x_translation", data=self.get_entry("sl4cv")) - # height.attrs["units"] = "mm" - # # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0)) - # # distance.attrs["units"] = "mm" + # ── Filter set (attenuator) ─────────────────────────────────────────── + filter_set = instrument.create_group("filter_set") + filter_set.attrs["NX_class"] = "NXattenuator" + filter_set.create_dataset("material", data="Si") + filter_set.create_dataset( + "description", + data=( + "Four linear filter stages (filter_array_1_x … filter_array_4_x). " + "Each stage has five filter positions plus an 'out' position." + ), + ) + for i in range(1, 5): + self._safe_dataset(filter_set, f"stage_{i}_x", f"filter_array_{i}_x", units="mm") + # TODO: attenuator_transmission — requires a computed/mapped signal: + # attenuator_transmission = 10 ^ (ftrans) + # self._safe_dataset(filter_set, "attenuator_transmission", "ftrans", + # units="NX_DIMENSIONLESS") - # slit_5 = instrument.create_group("slit_5") - # slit_5.attrs["NX_class"] = "NXslit" - # source.create_dataset(name="material", data="Si") - # source.create_dataset(name="description", data="Slit 5, experimental hutch, exposure box") - # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl5wh")) - # x_gap.attrs["units"] = "mm" - # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl5wv")) - # y_gap.attrs["units"] = "mm" - # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl5ch")) - # x_translation.attrs["units"] = "mm" - # height = source.create_dataset(name="x_translation", data=self.get_entry("sl5cv")) - # height.attrs["units"] = "mm" - # # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0)) - # # distance.attrs["units"] = "mm" + # ── Slit 4 (experimental hutch, exposure box) ───────────────────────── + slit_4 = instrument.create_group("slit_4") + slit_4.attrs["NX_class"] = "NXslit" + slit_4.create_dataset("material", data="Ge") + slit_4.create_dataset("description", data="Slit 4, experimental hutch, exposure box") + slit_4.attrs["blade_note"] = ( + "Individual blade positions stored. " + "x_gap = outer_x - inner_x, y_gap = top_y - bottom_y" + ) + self._slit_blades(slit_4, "sl4") - # beam_stop_1 = instrument.create_group("beam_stop_1") - # beam_stop_1.attrs["NX_class"] = "NX_beamstop" - # beam_stop_1.create_dataset(name="description", data="circular") - # bms1_size = beam_stop_1.create_dataset(name="size", data=3) - # bms1_size.attrs["units"] = "mm" - # bms1_x = beam_stop_1.create_dataset(name="size", data=self.get_entry("bs1x")) - # bms1_x.attrs["units"] = "mm" - # bms1_y = beam_stop_1.create_dataset(name="size", data=self.get_entry("bs1y")) - # bms1_y.attrs["units"] = "mm" + # ── Slit 5 (experimental hutch, exposure box) ───────────────────────── + slit_5 = instrument.create_group("slit_5") + slit_5.attrs["NX_class"] = "NXslit" + slit_5.create_dataset("material", data="Si") + slit_5.create_dataset("description", data="Slit 5, experimental hutch, exposure box") + slit_5.attrs["blade_note"] = ( + "Individual blade positions stored. " + "x_gap = outer_x - inner_x, y_gap = top_y - bottom_y" + ) + self._slit_blades(slit_5, "sl5") - # beam_stop_2 = instrument.create_group("beam_stop_2") - # beam_stop_2.attrs["NX_class"] = "NX_beamstop" - # beam_stop_2.create_dataset(name="description", data="rectangular") - # bms2_size_x = beam_stop_2.create_dataset(name="size_x", data=5) - # bms2_size_x.attrs["units"] = "mm" - # bms2_size_y = beam_stop_2.create_dataset(name="size_y", data=2.25) - # bms2_size_y.attrs["units"] = "mm" - # bms2_x = beam_stop_2.create_dataset(name="size", data=self.get_entry("bs2x")) - # bms2_x.attrs["units"] = "mm" - # bms2_y = beam_stop_2.create_dataset(name="size", data=self.get_entry("bs2y")) - # bms2_y.attrs["units"] = "mm" - # bms2_data = beam_stop_2.create_dataset(name="data", data=self.get_entry("diode")) - # bms2_data.attrs["units"] = "NX_DIMENSIONLESS" + # ── Beam stop 1 ──────────────────────────────────────────────────────── + beam_stop_1 = instrument.create_group("beam_stop_1") + beam_stop_1.attrs["NX_class"] = "NXbeam_stop" + beam_stop_1.create_dataset("description", data="circular") + beam_stop_1.create_dataset("size", data=3.0).attrs["units"] = "mm" + self._safe_dataset(beam_stop_1, "x", "bs1x", units="mm") + self._safe_dataset(beam_stop_1, "y", "bs1y", units="mm") + # TODO: diode signal behind beam stop 1 once device is available + # self._safe_dataset(beam_stop_1, "data", "diode1", units="NX_DIMENSIONLESS") - # if ( - # "eiger1p5m" in self.device_manager.devices - # and self.device_manager.devices.eiger1p5m.enabled - # ): - # eiger_4 = instrument.create_group("eiger_4") - # eiger_4.attrs["NX_class"] = "NXdetector" - # x_pixel_size = eiger_4.create_dataset(name="x_pixel_size", data=75) - # x_pixel_size.attrs["units"] = "um" - # y_pixel_size = eiger_4.create_dataset(name="y_pixel_size", data=75) - # y_pixel_size.attrs["units"] = "um" - # polar_angle = eiger_4.create_dataset(name="polar_angle", data=0) - # polar_angle.attrs["units"] = "degrees" - # azimuthal_angle = eiger_4.create_dataset(name="azimuthal_angle", data=0) - # azimuthal_angle.attrs["units"] = "degrees" - # rotation_angle = eiger_4.create_dataset(name="rotation_angle", data=0) - # rotation_angle.attrs["units"] = "degrees" - # description = eiger_4.create_dataset( - # name="description", data="Single-photon counting detector, 320 micron-thick Si chip" - # ) - # orientation = eiger_4.create_group("orientation") - # orientation.attrs["description"] = ( - # "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam." - # ) - # orientation.create_dataset(name="transpose", data=1) - # orientation.create_dataset(name="rot90", data=3) + # ── Beam stop 2 ──────────────────────────────────────────────────────── + beam_stop_2 = instrument.create_group("beam_stop_2") + beam_stop_2.attrs["NX_class"] = "NXbeam_stop" + beam_stop_2.create_dataset("description", data="rectangular") + beam_stop_2.create_dataset("size_x", data=5.0).attrs["units"] = "mm" + beam_stop_2.create_dataset("size_y", data=2.25).attrs["units"] = "mm" + self._safe_dataset(beam_stop_2, "x", "bs2x", units="mm") + self._safe_dataset(beam_stop_2, "y", "bs2y", units="mm") + # TODO: diode (scattered/transmitted signal) once device is available + # self._safe_dataset(beam_stop_2, "data", "diode", units="NX_DIMENSIONLESS") - # if ( - # "eiger9m" in self.device_manager.devices - # and self.device_manager.devices.eiger9m.enabled - # and "eiger9m" in self.file_references - # ): - # eiger9m = instrument.create_group("eiger9m") - # eiger9m.attrs["NX_class"] = "NXdetector" - # x_pixel_size = eiger9m.create_dataset(name="x_pixel_size", data=75) - # x_pixel_size.attrs["units"] = "um" - # y_pixel_size = eiger9m.create_dataset(name="y_pixel_size", data=75) - # y_pixel_size.attrs["units"] = "um" - # polar_angle = eiger9m.create_dataset(name="polar_angle", data=0) - # polar_angle.attrs["units"] = "degrees" - # azimuthal_angle = eiger9m.create_dataset(name="azimuthal_angle", data=0) - # azimuthal_angle.attrs["units"] = "degrees" - # rotation_angle = eiger9m.create_dataset(name="rotation_angle", data=0) - # rotation_angle.attrs["units"] = "degrees" - # description = eiger9m.create_dataset( - # name="description", - # data="Eiger9M detector, in-house developed, Paul Scherrer Institute", - # ) - # orientation = eiger9m.create_group("orientation") - # orientation.attrs["description"] = ( - # "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam." - # ) - # orientation.create_dataset(name="transpose", data=1) - # orientation.create_dataset(name="rot90", data=3) - # data = eiger9m.create_ext_link( - # "data", self.file_references["eiger9m"]["path"], "EG9M/data" - # ) - # status = eiger9m.create_ext_link( - # "status", self.file_references["eiger9m"]["path"], "EG9M/status" - # ) + # ── Detector translation ────────────────────────────────────────────── + self._safe_dataset( + instrument, "detector_translation_x", "dettrx", + units="mm", description="Detector x-translation stage" + ) - # if ( - # "pilatus_2" in self.device_manager.devices - # and self.device_manager.devices.pilatus_2.enabled - # and "pilatus_2" in self.file_references - # ): - # pilatus_2 = instrument.create_group("pilatus_2") - # pilatus_2.attrs["NX_class"] = "NXdetector" - # x_pixel_size = pilatus_2.create_dataset(name="x_pixel_size", data=172) - # x_pixel_size.attrs["units"] = "um" - # y_pixel_size = pilatus_2.create_dataset(name="y_pixel_size", data=172) - # y_pixel_size.attrs["units"] = "um" - # polar_angle = pilatus_2.create_dataset(name="polar_angle", data=0) - # polar_angle.attrs["units"] = "degrees" - # azimuthal_angle = pilatus_2.create_dataset(name="azimuthal_angle", data=0) - # azimuthal_angle.attrs["units"] = "degrees" - # rotation_angle = pilatus_2.create_dataset(name="rotation_angle", data=0) - # rotation_angle.attrs["units"] = "degrees" - # description = pilatus_2.create_dataset( - # name="description", data="Pilatus 300K detector, Dectris, Switzerland" - # ) - # orientation = pilatus_2.create_group("orientation") - # orientation.attrs["description"] = ( - # "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam." - # ) - # orientation.create_dataset(name="transpose", data=1) - # orientation.create_dataset(name="rot90", data=2) - # data = pilatus_2.create_ext_link( - # "data", self.file_references["pilatus_2"]["path"], "entry/instrument/pilatus_2/data" - # ) + # ── Eiger 1.5M detector ──────────────────────────────────────────────── + if ( + "eiger_1_5" in self.device_manager.devices + and self.device_manager.devices.eiger_1_5.enabled + and "eiger_1_5" in self.file_references + ): + eiger = instrument.create_group("eiger_1_5") + eiger.attrs["NX_class"] = "NXdetector" + eiger.create_dataset("x_pixel_size", data=75.0).attrs["units"] = "um" + eiger.create_dataset("y_pixel_size", data=75.0).attrs["units"] = "um" + eiger.create_dataset("polar_angle", data=0.0).attrs["units"] = "degrees" + eiger.create_dataset("azimuthal_angle", data=0.0).attrs["units"] = "degrees" + eiger.create_dataset("rotation_angle", data=0.0).attrs["units"] = "degrees" + eiger.create_dataset( + "description", + data="Eiger 1.5M detector, in-house developed, Paul Scherrer Institute", + ) + eiger.create_dataset( + "type", + data="Single-photon counting detector, 320 micron-thick Si chip", + ) + orientation = eiger.create_group("orientation") + orientation.attrs["description"] = ( + "Orientation defines the number of counterclockwise rotations by 90 deg " + "followed by a transposition to reach the 'cameraman orientation', " + "that is looking towards the beam." + ) + orientation.create_dataset("transpose", data=1) + orientation.create_dataset("rot90", data=3) + # Soft-link the Eiger data frames recorded in the collection + try: + for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys(): + eiger.create_soft_link( + k, f"/entry/collection/file_references/eiger_1_5/{k}" + ) + except Exception: + pass - # if ( - # "falcon" in self.device_manager.devices - # and self.device_manager.devices.falcon.enabled - # and "falcon" in self.file_references - # ): - # falcon = instrument.create_ext_link( - # "falcon", self.file_references["falcon"]["path"], "entry/instrument/FalconX1" - # ) + # External link to the pixel mask stored in the Eiger master file + # Path inside master: /entry/instrument/detector/pixel_mask + try: + eiger.create_ext_link( + "pixel_mask", + self.file_references["eiger_1_5"].file_path, + "/entry/instrument/detector/pixel_mask", + ) + except Exception: + pass + + # ── MCS (multi-channel scaler) ───────────────────────────────────────── + if "mcs" in self.device_manager.devices and self.device_manager.devices.mcs.enabled: + mcs_group = instrument.create_group("mcs") + mcs_group.attrs["NX_class"] = "NXdetector" + mcs_group.attrs["description"] = "MCS card cSAXS — multi-channel scaler" + try: + mcs_group.create_soft_link("data", "/entry/collection/devices/mcs") + except Exception: + pass + + # ── Flomni sample / nano-positioning stage ───────────────────────────── + flomni = instrument.create_group("flomni_stage") + flomni.attrs["NX_class"] = "NXpositioner" + flomni.attrs["description"] = "Flomni nano-positioning sample stage (OMNYfluoX)" + self._safe_dataset(flomni, "x", "fsamx", units="mm") + self._safe_dataset(flomni, "y", "fsamy", units="mm") + self._safe_dataset(flomni, "rot_y", "fsamroy", units="degrees") + # Additional Flomni axes (coarse + optics translations) — stored if scanned + self._safe_dataset(flomni, "transfer_x", "ftransx", units="mm") + self._safe_dataset(flomni, "transfer_y", "ftransy", units="mm") + self._safe_dataset(flomni, "transfer_z", "ftransz", units="mm") + self._safe_dataset(flomni, "tray", "ftray", units="mm") + self._safe_dataset(flomni, "eye_x", "feyex", units="mm") + self._safe_dataset(flomni, "eye_y", "feyey", units="mm") + self._safe_dataset(flomni, "opt_x", "foptx", units="mm") + self._safe_dataset(flomni, "opt_y", "fopty", units="mm") + self._safe_dataset(flomni, "opt_z", "foptz", units="mm") + self._safe_dataset(flomni, "track_y", "ftracky", units="mm") + self._safe_dataset(flomni, "track_z", "ftrackz", units="mm") + + # ── Real-time encoder positions (rt_positions) ───────────────────────── + # A single soft link exposes the entire rt_positions folder recorded by + # RtFlomniFlyer at the canonical instrument path so reconstruction codes + # always find it at /entry/instrument/flomni_stage/rt_positions. + try: + flomni.create_soft_link( + "rt_positions", "/entry/collection/devices/rt_positions" + ) + except Exception: + pass + + + +# from __future__ import annotations + +# import numpy as np +# from bec_server.file_writer.default_writer import DefaultFormat + + +# class cSAXSNeXusFormat(DefaultFormat): +# """ +# NeXus file format for cSAXS beamline. This format is based on the default NeXus format, but with some additional entries specific to the cSAXS beamline. The structure of the file is based on the NeXus standard, but with some additional groups and datasets specific to the cSAXS beamline. +# """ + +# def format(self) -> None: +# """ +# Prepare the NeXus file format. +# Override this method in file writer plugins to customize the HDF5 file format. + +# The class provides access to the following attributes: +# - self.storage: The HDF5Storage object. +# - self.data: The data dictionary. +# - self.file_references: The file references dictionary, which has the link to external data. +# - self.device_manager: The DeviceManagerBase object. +# - self.get_entry(name, default=None): Helper method to get an entry from the data dictionary. + +# See also: :class:`bec_server.file_writer.file_writer.HDF5Storage`. + +# """ +# entry = self.storage.create_group("entry") +# # ------------------------------------------------------------------ +# # Create ptychography entry +# # ------------------------------------------------------------------ +# entry_ptycho = entry.create_group("entry_ptycho") +# entry_ptycho.attrs["NX_class"] = "NXentry" +# entry_ptycho.attrs["definition"] = "NXptycho" + +# # ------------------------------------------------------------------ +# # NXdata group +# # ------------------------------------------------------------------ +# nxdata = entry_ptycho.create_group("data") +# nxdata.attrs["NX_class"] = "NXdata" +# nxdata.attrs["signal"] = "data" + +# # ------------------------------------------------------------------ +# # Link Eiger data (external) +# # ------------------------------------------------------------------ +# data = entry.create_group("data") +# data.attrs["NX_class"] = "NXentry" +# # Either ext link! +# #data.create_ext_link("data", self.file_references["eiger_1_5"].file_path, "entry/data") +# # Or soft link to already linked path +# data_soft = entry.create_group("data_soft") +# data_soft.attrs["NX_class"] = "NXentry" +# for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys(): +# data_soft.create_soft_link(k, f"/entry/collection/file_references/eiger_1_5/{k}") + +# # ------------------------------------------------------------------ +# # Link positions (soft links) +# # ------------------------------------------------------------------ +# positions_path = "/entry/collection/devices/rt_positions" + +# try: +# nxdata.create_soft_link("positions", positions_path) +# except Exception: +# pass + +# # ------------------------------------------------------------------ +# # Sample group +# # ------------------------------------------------------------------ +# sample = entry_ptycho.create_group("sample") +# sample.attrs["NX_class"] = "NXsample" + +# try: +# sample.create_soft_link("positions", positions_path) +# except Exception: +# pass +# # entry = self.storage.create_group("entry") + +# # # /entry/control +# # control = entry.create_group("control") +# # control.attrs["NX_class"] = "NXmonitor" +# # control.create_dataset(name="mode", data="monitor") + +# # ######### +# # # EXAMPLE for soft link +# # ######### +# # # /entry/data +# # if "eiger_4" in self.device_manager.devices: +# # entry.create_soft_link(name="data", target="/entry/instrument/eiger_4") + +# # ######## +# # # EXAMPLE for external link +# # ######## +# # # control = entry.create_group("sample") +# # # control.create_ext_link("data", self.file_references["eiger9m"]["path"], "EG9M/data") + +# # # /entry/sample +# # control = entry.create_group("sample") +# # control.attrs["NX_class"] = "NXsample" +# # control.create_dataset(name="name", data=self.data.get("samplename")) +# # control.create_dataset(name="description", data=self.data.get("sample_description")) + +# # # /entry/instrument +# # instrument = entry.create_group("instrument") +# # instrument.attrs["NX_class"] = "NXinstrument" + +# # source = instrument.create_group("source") +# # source.attrs["NX_class"] = "NXsource" +# # source.create_dataset(name="type", data="Synchrotron X-ray Source") +# # source.create_dataset(name="name", data="Swiss Light Source") +# # source.create_dataset(name="probe", data="x-ray") + +# # # /entry +# # entry = self.storage.create_group("entry") +# # entry.attrs["NX_class"] = "NXentry" +# # entry.attrs["definition"] = "NXsas" +# # entry.attrs["start_time"] = self.data.get("start_time") +# # entry.attrs["end_time"] = self.data.get("end_time") +# # entry.attrs["version"] = 1.0 + +# # # /entry/control +# # control = entry.create_group("control") +# # control.attrs["NX_class"] = "NXmonitor" +# # control.create_dataset(name="mode", data="monitor") +# # control.create_dataset(name="integral", data=self.get_entry("bpm4i")) + +# # # /entry/data +# # main_data = entry.create_group("data") +# # main_data.attrs["NX_class"] = "NXdata" +# # if "eiger_4" in self.device_manager.devices: +# # main_data.create_soft_link(name="data", target="/entry/instrument/eiger_4/data") +# # elif "eiger9m" in self.device_manager.devices: +# # main_data.create_soft_link(name="data", target="/entry/instrument/eiger9m/data") +# # elif "pilatus_2" in self.device_manager.devices: +# # main_data.create_soft_link(name="data", target="/entry/instrument/pilatus_2/data") + +# # # /entry/sample +# # control = entry.create_group("sample") +# # control.attrs["NX_class"] = "NXsample" +# # control.create_dataset(name="name", data=self.get_entry("samplename")) +# # control.create_dataset(name="description", data=self.data.get("sample_description")) +# # x_translation = control.create_dataset(name="x_translation", data=self.get_entry("samx")) +# # x_translation.attrs["units"] = "mm" +# # y_translation = control.create_dataset(name="y_translation", data=self.get_entry("samy")) +# # y_translation.attrs["units"] = "mm" +# # temperature_log = control.create_dataset( +# # name="temperature_log", data=self.get_entry("temp") +# # ) +# # temperature_log.attrs["units"] = "K" + +# # # /entry/instrument +# # instrument = entry.create_group("instrument") +# # instrument.attrs["NX_class"] = "NXinstrument" +# # instrument.create_dataset(name="name", data="cSAXS beamline") + +# # source = instrument.create_group("source") +# # source.attrs["NX_class"] = "NXsource" +# # source.create_dataset(name="type", data="Synchrotron X-ray Source") +# # source.create_dataset(name="name", data="Swiss Light Source") +# # source.create_dataset(name="probe", data="x-ray") +# # distance = source.create_dataset( +# # name="distance", data=-33800 - np.asarray(self.get_entry("samz", 0)) +# # ) +# # distance.attrs["units"] = "mm" +# # sigma_x = source.create_dataset(name="sigma_x", data=0.202) +# # sigma_x.attrs["units"] = "mm" +# # sigma_y = source.create_dataset(name="sigma_y", data=0.018) +# # sigma_y.attrs["units"] = "mm" +# # divergence_x = source.create_dataset(name="divergence_x", data=0.000135) +# # divergence_x.attrs["units"] = "radians" +# # divergence_y = source.create_dataset(name="divergence_y", data=0.000025) +# # divergence_y.attrs["units"] = "radians" +# # current = source.create_dataset(name="current", data=self.get_entry("curr")) +# # current.attrs["units"] = "mA" + +# # insertion_device = instrument.create_group("insertion_device") +# # insertion_device.attrs["NX_class"] = "NXinsertion_device" +# # source.create_dataset(name="type", data="undulator") +# # gap = source.create_dataset(name="gap", data=self.get_entry("idgap")) +# # gap.attrs["units"] = "mm" +# # k = source.create_dataset(name="k", data=2.46) +# # k.attrs["units"] = "NX_DIMENSIONLESS" +# # length = source.create_dataset(name="length", data=1820) +# # length.attrs["units"] = "mm" + +# # slit_0 = instrument.create_group("slit_0") +# # slit_0.attrs["NX_class"] = "NXslit" +# # source.create_dataset(name="material", data="OFHC Cu") +# # source.create_dataset(name="description", data="Horizontal secondary source slit") +# # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl0wh")) +# # x_gap.attrs["units"] = "mm" +# # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl0ch")) +# # x_translation.attrs["units"] = "mm" +# # distance = source.create_dataset( +# # name="distance", data=-21700 - np.asarray(self.get_entry("samz", 0)) +# # ) +# # distance.attrs["units"] = "mm" + +# # slit_1 = instrument.create_group("slit_1") +# # slit_1.attrs["NX_class"] = "NXslit" +# # source.create_dataset(name="material", data="OFHC Cu") +# # source.create_dataset(name="description", data="Horizontal secondary source slit") +# # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl1wh")) +# # x_gap.attrs["units"] = "mm" +# # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl1wv")) +# # y_gap.attrs["units"] = "mm" +# # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl1ch")) +# # x_translation.attrs["units"] = "mm" +# # height = source.create_dataset(name="x_translation", data=self.get_entry("sl1ch")) +# # height.attrs["units"] = "mm" +# # distance = source.create_dataset( +# # name="distance", data=-7800 - np.asarray(self.get_entry("samz", 0)) +# # ) +# # distance.attrs["units"] = "mm" + +# # mono = instrument.create_group("monochromator") +# # mono.attrs["NX_class"] = "NXmonochromator" +# # mokev = self.data.get("mokev", {}) +# # if mokev: +# # if isinstance(mokev, list): +# # mokev = mokev[0] +# # wavelength = mono.create_dataset( +# # name="wavelength", data=12.3984193 / (mokev.get("mokev").get("value") + 1e-9) +# # ) +# # wavelength.attrs["units"] = "Angstrom" +# # energy = mono.create_dataset(name="energy", data=mokev.get("mokev").get("value")) +# # energy.attrs["units"] = "keV" +# # mono.create_dataset(name="type", data="Double crystal fixed exit monochromator.") +# # distance = mono.create_dataset( +# # name="distance", data=-5220 - np.asarray(self.get_entry("samz", 0)) +# # ) +# # distance.attrs["units"] = "mm" + +# # crystal_1 = mono.create_group("crystal_1") +# # crystal_1.attrs["NX_class"] = "NXcrystal" +# # crystal_1.create_dataset(name="usage", data="Bragg") +# # crystal_1.create_dataset(name="order_no", data="1") +# # crystal_1.create_dataset(name="reflection", data="[1 1 1]") +# # bragg_angle = crystal_1.create_dataset(name="bragg_angle", data=self.get_entry("moth1")) +# # bragg_angle.attrs["units"] = "degrees" + +# # crystal_2 = mono.create_group("crystal_2") +# # crystal_2.attrs["NX_class"] = "NXcrystal" +# # crystal_2.create_dataset(name="usage", data="Bragg") +# # crystal_2.create_dataset(name="order_no", data="2") +# # crystal_2.create_dataset(name="reflection", data="[1 1 1]") +# # bragg_angle = crystal_2.create_dataset(name="bragg_angle", data=self.get_entry("moth1")) +# # bragg_angle.attrs["units"] = "degrees" +# # bend_x = crystal_2.create_dataset(name="bend_x", data=self.get_entry("mobd")) +# # bend_x.attrs["units"] = "degrees" + +# # xbpm4 = instrument.create_group("XBPM4") +# # xbpm4.attrs["NX_class"] = "NXdetector" +# # xbpm4_sum = xbpm4.create_group("XBPM4_sum") +# # xbpm4_sum_data = xbpm4_sum.create_dataset(name="data", data=self.get_entry("bpm4s")) +# # xbpm4_sum_data.attrs["units"] = "NX_DIMENSIONLESS" +# # xbpm4_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.") +# # xbpm4_x = xbpm4.create_group("XBPM4_x") +# # xbpm4_x_data = xbpm4_x.create_dataset(name="data", data=self.get_entry("bpm4x")) +# # xbpm4_x_data.attrs["units"] = "NX_DIMENSIONLESS" +# # xbpm4_x.create_dataset( +# # name="description", +# # data="Normalized difference of counts between left and right quadrants.", +# # ) +# # xbpm4_y = xbpm4.create_group("XBPM4_y") +# # xbpm4_y_data = xbpm4_y.create_dataset(name="data", data=self.get_entry("bpm4y")) +# # xbpm4_y_data.attrs["units"] = "NX_DIMENSIONLESS" +# # xbpm4_y.create_dataset( +# # name="description", +# # data="Normalized difference of counts between high and low quadrants.", +# # ) +# # xbpm4_skew = xbpm4.create_group("XBPM4_skew") +# # xbpm4_skew_data = xbpm4_skew.create_dataset(name="data", data=self.get_entry("bpm4z")) +# # xbpm4_skew_data.attrs["units"] = "NX_DIMENSIONLESS" +# # xbpm4_skew.create_dataset( +# # name="description", data="Normalized difference of counts between diagonal quadrants." +# # ) + +# # mirror = instrument.create_group("mirror") +# # mirror.attrs["NX_class"] = "NXmirror" +# # mirror.create_dataset(name="type", data="single") +# # mirror.create_dataset( +# # name="description", +# # data="Grazing incidence mirror to reject high-harmonic wavelengths from the monochromator. There are three coating options available that are used depending on the X-ray energy, no coating (SiO2), rhodium (Rh) or platinum (Pt).", +# # ) +# # incident_angle = mirror.create_dataset(name="incident_angle", data=self.get_entry("mith")) +# # incident_angle.attrs["units"] = "degrees" +# # substrate_material = mirror.create_dataset(name="substrate_material", data="SiO2") +# # substrate_material.attrs["units"] = "NX_CHAR" +# # coating_material = mirror.create_dataset(name="coating_material", data="SiO2") +# # coating_material.attrs["units"] = "NX_CHAR" +# # bend_y = mirror.create_dataset(name="bend_y", data="mibd") +# # bend_y.attrs["units"] = "NX_DIMENSIONLESS" +# # distance = mirror.create_dataset( +# # name="distance", data=-4370 - np.asarray(self.get_entry("samz", 0)) +# # ) +# # distance.attrs["units"] = "mm" + +# # xbpm5 = instrument.create_group("XBPM5") +# # xbpm5.attrs["NX_class"] = "NXdetector" +# # xbpm5_sum = xbpm5.create_group("XBPM5_sum") +# # xbpm5_sum_data = xbpm5_sum.create_dataset(name="data", data=self.get_entry("bpm5s")) +# # xbpm5_sum_data.attrs["units"] = "NX_DIMENSIONLESS" +# # xbpm5_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.") +# # xbpm5_x = xbpm5.create_group("XBPM5_x") +# # xbpm5_x_data = xbpm5_x.create_dataset(name="data", data=self.get_entry("bpm5x")) +# # xbpm5_x_data.attrs["units"] = "NX_DIMENSIONLESS" +# # xbpm5_x.create_dataset( +# # name="description", +# # data="Normalized difference of counts between left and right quadrants.", +# # ) +# # xbpm5_y = xbpm5.create_group("XBPM5_y") +# # xbpm5_y_data = xbpm5_y.create_dataset(name="data", data=self.get_entry("bpm5y")) +# # xbpm5_y_data.attrs["units"] = "NX_DIMENSIONLESS" +# # xbpm5_y.create_dataset( +# # name="description", +# # data="Normalized difference of counts between high and low quadrants.", +# # ) +# # xbpm5_skew = xbpm5.create_group("XBPM5_skew") +# # xbpm5_skew_data = xbpm5_skew.create_dataset(name="data", data=self.get_entry("bpm5z")) +# # xbpm5_skew_data.attrs["units"] = "NX_DIMENSIONLESS" +# # xbpm5_skew.create_dataset( +# # name="description", data="Normalized difference of counts between diagonal quadrants." +# # ) + +# # slit_2 = instrument.create_group("slit_2") +# # slit_2.attrs["NX_class"] = "NXslit" +# # source.create_dataset(name="material", data="Ag") +# # source.create_dataset(name="description", data="Slit 2, optics hutch") +# # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl2wh")) +# # x_gap.attrs["units"] = "mm" +# # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl2wv")) +# # y_gap.attrs["units"] = "mm" +# # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl2ch")) +# # x_translation.attrs["units"] = "mm" +# # height = source.create_dataset(name="x_translation", data=self.get_entry("sl2cv")) +# # height.attrs["units"] = "mm" +# # distance = source.create_dataset( +# # name="distance", data=-3140 - np.asarray(self.get_entry("samz", 0)) +# # ) +# # distance.attrs["units"] = "mm" + +# # slit_3 = instrument.create_group("slit_3") +# # slit_3.attrs["NX_class"] = "NXslit" +# # source.create_dataset(name="material", data="Si") +# # source.create_dataset(name="description", data="Slit 3, experimental hutch, exposure box") +# # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl3wh")) +# # x_gap.attrs["units"] = "mm" +# # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl3wv")) +# # y_gap.attrs["units"] = "mm" +# # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl3ch")) +# # x_translation.attrs["units"] = "mm" +# # height = source.create_dataset(name="x_translation", data=self.get_entry("sl3cv")) +# # height.attrs["units"] = "mm" +# # # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0)) +# # # distance.attrs["units"] = "mm" + +# # filter_set = instrument.create_group("filter_set") +# # filter_set.attrs["NX_class"] = "NXattenuator" +# # filter_set.create_dataset(name="material", data="Si") +# # filter_set.create_dataset( +# # name="description", +# # data="The filter set consists of 4 linear stages, each with five filter positions. Additionally, each one allows for an out position to allow 'no filtering'.", +# # ) +# # attenuator_transmission = filter_set.create_dataset( +# # name="attenuator_transmission", data=10 ** self.get_entry("ftrans", 0) +# # ) +# # attenuator_transmission.attrs["units"] = "NX_DIMENSIONLESS" + +# # slit_4 = instrument.create_group("slit_4") +# # slit_4.attrs["NX_class"] = "NXslit" +# # source.create_dataset(name="material", data="Si") +# # source.create_dataset(name="description", data="Slit 4, experimental hutch, exposure box") +# # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl4wh")) +# # x_gap.attrs["units"] = "mm" +# # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl4wv")) +# # y_gap.attrs["units"] = "mm" +# # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl4ch")) +# # x_translation.attrs["units"] = "mm" +# # height = source.create_dataset(name="x_translation", data=self.get_entry("sl4cv")) +# # height.attrs["units"] = "mm" +# # # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0)) +# # # distance.attrs["units"] = "mm" + +# # slit_5 = instrument.create_group("slit_5") +# # slit_5.attrs["NX_class"] = "NXslit" +# # source.create_dataset(name="material", data="Si") +# # source.create_dataset(name="description", data="Slit 5, experimental hutch, exposure box") +# # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl5wh")) +# # x_gap.attrs["units"] = "mm" +# # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl5wv")) +# # y_gap.attrs["units"] = "mm" +# # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl5ch")) +# # x_translation.attrs["units"] = "mm" +# # height = source.create_dataset(name="x_translation", data=self.get_entry("sl5cv")) +# # height.attrs["units"] = "mm" +# # # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0)) +# # # distance.attrs["units"] = "mm" + +# # beam_stop_1 = instrument.create_group("beam_stop_1") +# # beam_stop_1.attrs["NX_class"] = "NX_beamstop" +# # beam_stop_1.create_dataset(name="description", data="circular") +# # bms1_size = beam_stop_1.create_dataset(name="size", data=3) +# # bms1_size.attrs["units"] = "mm" +# # bms1_x = beam_stop_1.create_dataset(name="size", data=self.get_entry("bs1x")) +# # bms1_x.attrs["units"] = "mm" +# # bms1_y = beam_stop_1.create_dataset(name="size", data=self.get_entry("bs1y")) +# # bms1_y.attrs["units"] = "mm" + +# # beam_stop_2 = instrument.create_group("beam_stop_2") +# # beam_stop_2.attrs["NX_class"] = "NX_beamstop" +# # beam_stop_2.create_dataset(name="description", data="rectangular") +# # bms2_size_x = beam_stop_2.create_dataset(name="size_x", data=5) +# # bms2_size_x.attrs["units"] = "mm" +# # bms2_size_y = beam_stop_2.create_dataset(name="size_y", data=2.25) +# # bms2_size_y.attrs["units"] = "mm" +# # bms2_x = beam_stop_2.create_dataset(name="size", data=self.get_entry("bs2x")) +# # bms2_x.attrs["units"] = "mm" +# # bms2_y = beam_stop_2.create_dataset(name="size", data=self.get_entry("bs2y")) +# # bms2_y.attrs["units"] = "mm" +# # bms2_data = beam_stop_2.create_dataset(name="data", data=self.get_entry("diode")) +# # bms2_data.attrs["units"] = "NX_DIMENSIONLESS" + +# # if ( +# # "eiger1p5m" in self.device_manager.devices +# # and self.device_manager.devices.eiger1p5m.enabled +# # ): +# # eiger_4 = instrument.create_group("eiger_4") +# # eiger_4.attrs["NX_class"] = "NXdetector" +# # x_pixel_size = eiger_4.create_dataset(name="x_pixel_size", data=75) +# # x_pixel_size.attrs["units"] = "um" +# # y_pixel_size = eiger_4.create_dataset(name="y_pixel_size", data=75) +# # y_pixel_size.attrs["units"] = "um" +# # polar_angle = eiger_4.create_dataset(name="polar_angle", data=0) +# # polar_angle.attrs["units"] = "degrees" +# # azimuthal_angle = eiger_4.create_dataset(name="azimuthal_angle", data=0) +# # azimuthal_angle.attrs["units"] = "degrees" +# # rotation_angle = eiger_4.create_dataset(name="rotation_angle", data=0) +# # rotation_angle.attrs["units"] = "degrees" +# # description = eiger_4.create_dataset( +# # name="description", data="Single-photon counting detector, 320 micron-thick Si chip" +# # ) +# # orientation = eiger_4.create_group("orientation") +# # orientation.attrs["description"] = ( +# # "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam." +# # ) +# # orientation.create_dataset(name="transpose", data=1) +# # orientation.create_dataset(name="rot90", data=3) + +# # if ( +# # "eiger9m" in self.device_manager.devices +# # and self.device_manager.devices.eiger9m.enabled +# # and "eiger9m" in self.file_references +# # ): +# # eiger9m = instrument.create_group("eiger9m") +# # eiger9m.attrs["NX_class"] = "NXdetector" +# # x_pixel_size = eiger9m.create_dataset(name="x_pixel_size", data=75) +# # x_pixel_size.attrs["units"] = "um" +# # y_pixel_size = eiger9m.create_dataset(name="y_pixel_size", data=75) +# # y_pixel_size.attrs["units"] = "um" +# # polar_angle = eiger9m.create_dataset(name="polar_angle", data=0) +# # polar_angle.attrs["units"] = "degrees" +# # azimuthal_angle = eiger9m.create_dataset(name="azimuthal_angle", data=0) +# # azimuthal_angle.attrs["units"] = "degrees" +# # rotation_angle = eiger9m.create_dataset(name="rotation_angle", data=0) +# # rotation_angle.attrs["units"] = "degrees" +# # description = eiger9m.create_dataset( +# # name="description", +# # data="Eiger9M detector, in-house developed, Paul Scherrer Institute", +# # ) +# # orientation = eiger9m.create_group("orientation") +# # orientation.attrs["description"] = ( +# # "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam." +# # ) +# # orientation.create_dataset(name="transpose", data=1) +# # orientation.create_dataset(name="rot90", data=3) +# # data = eiger9m.create_ext_link( +# # "data", self.file_references["eiger9m"]["path"], "EG9M/data" +# # ) +# # status = eiger9m.create_ext_link( +# # "status", self.file_references["eiger9m"]["path"], "EG9M/status" +# # ) + +# # if ( +# # "pilatus_2" in self.device_manager.devices +# # and self.device_manager.devices.pilatus_2.enabled +# # and "pilatus_2" in self.file_references +# # ): +# # pilatus_2 = instrument.create_group("pilatus_2") +# # pilatus_2.attrs["NX_class"] = "NXdetector" +# # x_pixel_size = pilatus_2.create_dataset(name="x_pixel_size", data=172) +# # x_pixel_size.attrs["units"] = "um" +# # y_pixel_size = pilatus_2.create_dataset(name="y_pixel_size", data=172) +# # y_pixel_size.attrs["units"] = "um" +# # polar_angle = pilatus_2.create_dataset(name="polar_angle", data=0) +# # polar_angle.attrs["units"] = "degrees" +# # azimuthal_angle = pilatus_2.create_dataset(name="azimuthal_angle", data=0) +# # azimuthal_angle.attrs["units"] = "degrees" +# # rotation_angle = pilatus_2.create_dataset(name="rotation_angle", data=0) +# # rotation_angle.attrs["units"] = "degrees" +# # description = pilatus_2.create_dataset( +# # name="description", data="Pilatus 300K detector, Dectris, Switzerland" +# # ) +# # orientation = pilatus_2.create_group("orientation") +# # orientation.attrs["description"] = ( +# # "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam." +# # ) +# # orientation.create_dataset(name="transpose", data=1) +# # orientation.create_dataset(name="rot90", data=2) +# # data = pilatus_2.create_ext_link( +# # "data", self.file_references["pilatus_2"]["path"], "entry/instrument/pilatus_2/data" +# # ) + +# # if ( +# # "falcon" in self.device_manager.devices +# # and self.device_manager.devices.falcon.enabled +# # and "falcon" in self.file_references +# # ): +# # falcon = instrument.create_ext_link( +# # "falcon", self.file_references["falcon"]["path"], "entry/instrument/FalconX1" +# # ) -- 2.52.0 From 2fac8bc1d75c97ddca493e2e28bcb7bf35c62a22 Mon Sep 17 00:00:00 2001 From: x12sa Date: Tue, 31 Mar 2026 11:12:55 +0200 Subject: [PATCH 3/4] minor config updates --- csaxs_bec/device_configs/bl_optics_hutch.yaml | 27 ++++++++++--------- csaxs_bec/device_configs/main.yaml | 8 +++--- csaxs_bec/device_configs/ptycho_flomni.yaml | 1 + csaxs_bec/file_writer/csaxs_nexus.py | 11 +++++++- 4 files changed, 30 insertions(+), 17 deletions(-) diff --git a/csaxs_bec/device_configs/bl_optics_hutch.yaml b/csaxs_bec/device_configs/bl_optics_hutch.yaml index 98f39eb..c08cc68 100644 --- a/csaxs_bec/device_configs/bl_optics_hutch.yaml +++ b/csaxs_bec/device_configs/bl_optics_hutch.yaml @@ -68,18 +68,21 @@ ccmx: - cSAXS - optics -# ccm_energy: -# readoutPriority: baseline -# deviceClass: ophyd_devices.devices.simple_positioner.PSIPositionerBase -# prefix: "X12SA-OP-CCM1:" -# override_suffixes: -# user_readback: "ENERGY-GET" -# user_setpoint: "ENERGY-SET" -# velocity: "ROTY:VELO" -# deviceTags: -# - user motors -# enabled: true -# readOnly: false +# TODO: motion does not stop at end of movement. Issue with device class +ccm_energy: + readoutPriority: baseline + deviceClass: ophyd_devices.devices.simple_positioner.PSISimplePositioner + deviceConfig: + prefix: "X12SA-OP-CCM1:" + override_suffixes: + user_readback: "ENERGY-GET" + user_setpoint: "ENERGY-SET" + velocity: "ROTY.VELO" + motor_done_move: "ROTY.DMOV" + deviceTags: + - user motors + enabled: true + readOnly: false diff --git a/csaxs_bec/device_configs/main.yaml b/csaxs_bec/device_configs/main.yaml index dc05949..fda1777 100644 --- a/csaxs_bec/device_configs/main.yaml +++ b/csaxs_bec/device_configs/main.yaml @@ -1,8 +1,8 @@ # This is the main configuration file that is # commented or uncommented according to the type of experiment -# optics: -# - !include ./bl_optics_hutch.yaml +optics: + - !include ./bl_optics_hutch.yaml # frontend: # - !include ./bl_frontend.yaml @@ -10,8 +10,8 @@ endstation: - !include ./bl_endstation.yaml -# detectors: -# - !include ./bl_detectors.yaml +detectors: + - !include ./bl_detectors.yaml #sastt: # - !include ./sastt.yaml diff --git a/csaxs_bec/device_configs/ptycho_flomni.yaml b/csaxs_bec/device_configs/ptycho_flomni.yaml index 3013bc2..dbfd9ff 100644 --- a/csaxs_bec/device_configs/ptycho_flomni.yaml +++ b/csaxs_bec/device_configs/ptycho_flomni.yaml @@ -534,6 +534,7 @@ omny_panda: INENC4.VAL.Max: interf_st_rotx_max INENC4.VAL.Mean: interf_st_rotx_mean INENC4.VAL.Min: interf_st_rotx_min + PCAP.GATE_DURATION.Value: pcap_gate_duration_value deviceTags: - detector enabled: true diff --git a/csaxs_bec/file_writer/csaxs_nexus.py b/csaxs_bec/file_writer/csaxs_nexus.py index 30861f7..9c0f93f 100644 --- a/csaxs_bec/file_writer/csaxs_nexus.py +++ b/csaxs_bec/file_writer/csaxs_nexus.py @@ -3,6 +3,15 @@ from __future__ import annotations import numpy as np from bec_server.file_writer.default_writer import DefaultFormat +#there should be entry/sample, sample name not added to entry, this should have the positions +#start and end time of scan +#scan command and parameters +#slit width and rio to config file +#filter set from filtrans device +#ptycho entry with sample, scan position, detector data +#monochromator +#check enable out of detector +#funny namings down there class cSAXSNeXusFormat(DefaultFormat): """ @@ -432,7 +441,7 @@ class cSAXSNeXusFormat(DefaultFormat): # ── Flomni sample / nano-positioning stage ───────────────────────────── flomni = instrument.create_group("flomni_stage") flomni.attrs["NX_class"] = "NXpositioner" - flomni.attrs["description"] = "Flomni nano-positioning sample stage (OMNYfluoX)" + flomni.attrs["description"] = "Flomni nano-positioning sample stage" self._safe_dataset(flomni, "x", "fsamx", units="mm") self._safe_dataset(flomni, "y", "fsamy", units="mm") self._safe_dataset(flomni, "rot_y", "fsamroy", units="degrees") -- 2.52.0 From bdc996d3b21c77909592c1cf23046d3603ca8e5b Mon Sep 17 00:00:00 2001 From: x12sa Date: Tue, 31 Mar 2026 12:56:15 +0200 Subject: [PATCH 4/4] some adjustments in structure --- csaxs_bec/file_writer/csaxs_nexus.py | 1002 ++++++-------------------- 1 file changed, 240 insertions(+), 762 deletions(-) diff --git a/csaxs_bec/file_writer/csaxs_nexus.py b/csaxs_bec/file_writer/csaxs_nexus.py index 9c0f93f..5ac05c1 100644 --- a/csaxs_bec/file_writer/csaxs_nexus.py +++ b/csaxs_bec/file_writer/csaxs_nexus.py @@ -3,57 +3,80 @@ from __future__ import annotations import numpy as np from bec_server.file_writer.default_writer import DefaultFormat -#there should be entry/sample, sample name not added to entry, this should have the positions -#start and end time of scan -#scan command and parameters -#slit width and rio to config file -#filter set from filtrans device -#ptycho entry with sample, scan position, detector data -#monochromator -#check enable out of detector -#funny namings down there class cSAXSNeXusFormat(DefaultFormat): """ - NeXus file format for cSAXS / OMNYfluoX beamline (BEC era). + NeXus file format for the cSAXS beamline (BEC era). - Structure mirrors the old SPEC layout.xml hierarchy as closely as possible, - adapted for: - - The current BEC device list (see device_manager.devices) - - NXptycho definition (ptychography mode) - - Soft/external links to Eiger 1.5M data files + Mirrors the old SPEC layout.xml hierarchy and adds the flOMNI instrument + group for the nano-positioning stage used in ptychography. - Devices not yet available in the current setup are left as commented-out - TODO blocks so they can be enabled incrementally. + Device resilience + ----------------- + Every device read (self.get_entry / device call) is wrapped in try/except. + If a device is removed from the BEC config file between sessions it simply + disappears from the device_manager — the corresponding dataset or link is + silently omitted from the HDF5 file without raising an error. This means + the file structure is additive: re-add the device to the config and the + field reappears automatically on the next scan. - Old SPEC name → Current BEC device - ───────────────────────────────────── - samx / samy → fsamx / fsamy (Flomni sample stages) - sl3wh/wv/ch/cv → sl3trxi/o/b/t (individual blade motors) + Top-level HDF5 structure + ──────────────────────── + /entry NXentry (definition = NXptycho) + /sample NXsample ← primary sample group + /entry_ptycho NXentry ← generic ptycho entry + /data_soft NXentry ← convenience Eiger frame links + /control NXmonitor + /instrument NXinstrument + /source + /insertion_device + /monochromator + /XBPM3 + /slit_3 … slit_5 + /filter_set + /beam_stop_1 … beam_stop_2 + /eiger_1_5 NXdetector + /mcs NXdetector + /flOMNI NXpositioner + + Device name mapping (old SPEC → current BEC) + ──────────────────────────────────────────── + samx / samy → samx / samy (generic; kept for non-flOMNI configs) + sl3wh/wv/ch/cv → sl3trxi/o/b/t (individual blade motors; gap/centre TODO) sl4wh/wv/ch/cv → sl4trxi/o/b/t sl5wh/wv/ch/cv → sl5trxi/o/b/t bs1x / bs1y → bs1x / bs1y bs2x / bs2y → bs2x / bs2y dettrx → dettrx eiger_4 → eiger_1_5 - rt_positions → rt_positions (RtFlomniFlyer — real-time encoder positions) - mcs → mcs (MCSCardCSAXS) - filter_array → filter_array_1_x … 4_x - xbpm3 → xbpm3x / xbpm3y (stage positions, not signal readouts) - ───────────────────────────────────── - TODO devices (not yet in BEC list): - curr, idgap, mokev, moth1, mobd (source / monochromator) - mith, mibd, mirror_coating (mirror) - bpm4s/x/y/z, bpm5s/x/y/z (XBPM signal readouts) - sl0/1/2 motors (upstream slits) + mcs → mcs + filter_array → filter_array_1_x … filter_array_4_x + xbpm3 → xbpm3x / xbpm3y (stage positions; signal readouts TODO) + energy → ccm_energy + + TODO (devices not yet in BEC list) + ─────────────────────────────────── + curr, idgap ring current, undulator gap + moth1, mobd monochromator crystal angles + mith, mibd, mirror_coating mirror + bpm3s/x/y/z XBPM3 signal readouts + sl0 / sl1 / sl2 upstream optics-hutch slits + slit gap / centre derived from blade pairs + calibration offset """ # ------------------------------------------------------------------------- # Helpers # ------------------------------------------------------------------------- - def _safe_dataset(self, group, name: str, device: str, units: str | None = None, - description: str | None = None): - """Write a dataset from scan data; silently skip if the device was not recorded.""" + + def _safe_dataset(self, group, name: str, device: str, + units: str | None = None, + description: str | None = None) -> None: + """ + Write a dataset from the BEC scan data dictionary. + Silently skips if the device was not recorded in this scan + (e.g. removed from config, readoutPriority=on_request and not triggered, + or the scan finished before the device responded). + """ try: value = self.get_entry(device) ds = group.create_dataset(name, data=value) @@ -64,14 +87,18 @@ class cSAXSNeXusFormat(DefaultFormat): except Exception: pass - def _slit_blades(self, group, prefix: str): + def _safe_soft_link(self, group, name: str, target: str) -> None: + """Create a soft link; silently skip on any error.""" + try: + group.create_soft_link(name, target) + except Exception: + pass + + def _slit_blades(self, group, prefix: str) -> None: """ - Store individual blade positions for a 4-blade slit. - Gap and centre can be derived in post-processing: - x_gap = outer_x − inner_x - y_gap = top_y − bottom_y - x_centre = (outer_x + inner_x) / 2 - y_centre = (top_y + bottom_y) / 2 + Store individual blade motor positions for a 4-blade slit set. + Derived quantities (gap, centre) require a per-slit calibration offset + and will be added in a later update. """ for blade, motor in [ ("inner_x", f"{prefix}trxi"), @@ -81,143 +108,129 @@ class cSAXSNeXusFormat(DefaultFormat): ]: self._safe_dataset(group, blade, motor, units="mm") + # ------------------------------------------------------------------------- # Main format method # ------------------------------------------------------------------------- + def format(self) -> None: - """ - Build the NeXus/HDF5 layout. + """Build the NeXus/HDF5 layout for a cSAXS scan.""" - Top-level structure - ─────────────────── - /entry - /entry_ptycho [NXentry / NXptycho] ← ptychography data + positions - /data_soft [NXentry] ← soft links to Eiger frames - /control [NXmonitor] - /instrument [NXinstrument] ← mirrors old layout.xml - /source - /insertion_device - /monochromator (TODO) - /mirror (TODO) - /XBPM3 - /slit_3 … slit_5 - /filter_set - /beam_stop_1 … 2 - /eiger_1_5 [NXdetector] - /mcs [NXdetector] - /flomni_stage [NXpositioner] - /rt_positions [NXpositioner] ← real-time encoder positions (primary) - """ + # Canonical paths referenced by multiple groups + RT_POS_PATH = "/entry/instrument/flOMNI/rt_positions" + EIGER_COLL = "/entry/collection/file_references/eiger_1_5" - # ── Root entry ─────────────────────────────────────────────────────── + # ── Root entry ──────────────────────────────────────────────────────── entry = self.storage.create_group("entry") - entry.attrs["NX_class"] = "NXentry" + entry.attrs["NX_class"] = "NXentry" entry.attrs["definition"] = "NXptycho" - # ── Ptychography entry ─────────────────────────────────────────────── + # ── /entry/sample ───────────────────────────────────────────────────── + # Primary sample group. Contains the name of the mounted sample and a + # link to the real-time scan positions. Generic samx/samy are recorded + # here so the group is meaningful for non-flOMNI configurations too. + sample = entry.create_group("sample") + sample.attrs["NX_class"] = "NXsample" + # Soft-link name directly to the value BEC recorded in the collection. + # Only written when flomni_samples is present; other configs leave name absent. + if "flomni_samples" in self.device_manager.devices: + self._safe_soft_link( + sample, "name", + "/entry/collection/devices/flomni_samples" + "/flomni_samples_sample_names_sample0/value", + ) + # Generic coarse stage positions (meaningful in non-flOMNI setups) + self._safe_dataset(sample, "x_translation", "samx", units="mm") + self._safe_dataset(sample, "y_translation", "samy", units="mm") + # Real-time encoder positions — the primary scan coordinate + self._safe_soft_link(sample, "positions", RT_POS_PATH) + + # ── /entry/entry_ptycho ─────────────────────────────────────────────── + # Generic ptychography entry. Detector data and scan positions are + # linked in from the instrument groups so this entry is self-contained + # for downstream reconstruction codes. entry_ptycho = entry.create_group("entry_ptycho") - entry_ptycho.attrs["NX_class"] = "NXentry" + entry_ptycho.attrs["NX_class"] = "NXentry" entry_ptycho.attrs["definition"] = "NXptycho" - # NXdata — primary signal is detector frames; positions index the scan nxdata = entry_ptycho.create_group("data") nxdata.attrs["NX_class"] = "NXdata" - nxdata.attrs["signal"] = "data" - - # Canonical path for rt_positions — soft-linked from both entry_ptycho - # and the flomni_stage instrument group below. - rt_positions_instrument_path = "/entry/instrument/flomni_stage/rt_positions" - - # Link Eiger frames into NXdata + nxdata.attrs["signal"] = "data" + # Detector frames try: for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys(): - nxdata.create_soft_link(k, f"/entry/collection/file_references/eiger_1_5/{k}") + self._safe_soft_link(nxdata, k, f"{EIGER_COLL}/{k}") except Exception: pass + # Scan positions + self._safe_soft_link(nxdata, "positions", RT_POS_PATH) - try: - nxdata.create_soft_link("positions", rt_positions_instrument_path) - except Exception: - pass + # Link to the primary sample group + self._safe_soft_link(entry_ptycho, "sample", "/entry/sample") - # NXsample — mirrors old layout.xml /entry/sample - sample = entry_ptycho.create_group("sample") - sample.attrs["NX_class"] = "NXsample" - try: - sample.create_dataset("name", data=self.data.get("samplename", "")) - except Exception: - pass - try: - sample.create_dataset("description", data=self.data.get("sample_description", "")) - except Exception: - pass - self._safe_dataset(sample, "x_coarse_stage", "fsamx", units="mm") - self._safe_dataset(sample, "y_coarse_stage", "fsamy", units="mm") - # TODO: temperature_log — add device when available - # self._safe_dataset(sample, "temperature_log", "temp", units="K") - try: - sample.create_soft_link("positions", rt_positions_instrument_path) - except Exception: - pass - - # ── Soft links to Eiger data (convenience, mirrors old hardlink /entry/data) ── + # ── /entry/data_soft ────────────────────────────────────────────────── + # Convenience group mirroring the old /entry/data hardlink from layout.xml. data_soft = entry.create_group("data_soft") data_soft.attrs["NX_class"] = "NXentry" try: for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys(): - data_soft.create_soft_link(k, f"/entry/collection/file_references/eiger_1_5/{k}") + self._safe_soft_link(data_soft, k, f"{EIGER_COLL}/{k}") except Exception: pass - # ── Control (beam monitor) ──────────────────────────────────────────── + # ── /entry/control ──────────────────────────────────────────────────── control = entry.create_group("control") control.attrs["NX_class"] = "NXmonitor" control.create_dataset("mode", data="monitor") - # TODO: beam intensity monitor integral — add device when available + # TODO: beam intensity integral — add device when available # self._safe_dataset(control, "integral", "bpm_sum", units="NX_DIMENSIONLESS") - # ── Instrument ──────────────────────────────────────────────────────── + # ── /entry/instrument ───────────────────────────────────────────────── instrument = entry.create_group("instrument") instrument.attrs["NX_class"] = "NXinstrument" - instrument.create_dataset("name", data="cSAXS beamline / OMNYfluoX") + instrument.create_dataset("name", data="cSAXS beamline") # ── Source ──────────────────────────────────────────────────────────── + # Numerical values are currently unknown and stored as 0. + # Will be updated once the corresponding devices are in BEC. source = instrument.create_group("source") source.attrs["NX_class"] = "NXsource" source.create_dataset("type", data="Synchrotron X-ray Source") source.create_dataset("name", data="Swiss Light Source") source.create_dataset("probe", data="x-ray") - source.create_dataset("sigma_x", data=0.202).attrs["units"] = "mm" - source.create_dataset("sigma_y", data=0.018).attrs["units"] = "mm" - source.create_dataset("divergence_x", data=0.000135).attrs["units"] = "radians" - source.create_dataset("divergence_y", data=0.000025).attrs["units"] = "radians" - # TODO: ring current — add device when available + source.create_dataset("sigma_x", data=0.0).attrs["units"] = "mm" + source.create_dataset("sigma_y", data=0.0).attrs["units"] = "mm" + source.create_dataset("divergence_x", data=0.0).attrs["units"] = "radians" + source.create_dataset("divergence_y", data=0.0).attrs["units"] = "radians" + # TODO: current — add device when available # self._safe_dataset(source, "current", "curr", units="mA") - # ── Insertion device ───────────────────────────────────────────────── + # ── Insertion device ────────────────────────────────────────────────── insertion_device = instrument.create_group("insertion_device") insertion_device.attrs["NX_class"] = "NXinsertion_device" insertion_device.create_dataset("type", data="undulator") - insertion_device.create_dataset("k", data=2.46) - insertion_device.create_dataset("length", data=1820.0).attrs["units"] = "mm" - # TODO: undulator gap — add device when available + insertion_device.create_dataset("k", data=0.0) + insertion_device.create_dataset("length", data=0.0).attrs["units"] = "mm" + # TODO: gap — add device when available # self._safe_dataset(insertion_device, "gap", "idgap", units="mm") - # ── Monochromator ──────────────────────────────────────────────────── - # TODO: mokev, moth1, mobd not yet in device list - # mono = instrument.create_group("monochromator") - # mono.attrs["NX_class"] = "NXmonochromator" - # mono.create_dataset("type", data="Double crystal fixed exit monochromator.") - # try: - # mokev_val = self.get_entry("mokev") - # wavelength = mono.create_dataset( - # "wavelength", data=12.3984193 / (np.asarray(mokev_val) + 1e-9) - # ) - # wavelength.attrs["units"] = "Angstrom" - # energy = mono.create_dataset("energy", data=mokev_val) - # energy.attrs["units"] = "keV" - # except Exception: - # pass + # ── Monochromator ───────────────────────────────────────────────────── + # ccm_energy is a baseline device and is recorded in the scan data. + mono = instrument.create_group("monochromator") + mono.attrs["NX_class"] = "NXmonochromator" + mono.create_dataset("type", data="Double crystal fixed exit monochromator.") + try: + energy_kev = self.get_entry("ccm_energy") + energy_arr = np.asarray(energy_kev, dtype=float) + en_ds = mono.create_dataset("energy", data=energy_arr) + en_ds.attrs["units"] = "keV" + with np.errstate(divide="ignore", invalid="ignore"): + wavelength = np.where(energy_arr != 0, 12.3984193 / energy_arr, 0.0) + wl_ds = mono.create_dataset("wavelength", data=wavelength) + wl_ds.attrs["units"] = "Angstrom" + except Exception: + pass + # TODO: crystal angles — add moth1 / mobd when available # crystal_1 = mono.create_group("crystal_1") # crystal_1.attrs["NX_class"] = "NXcrystal" # crystal_1.create_dataset("usage", data="Bragg") @@ -243,59 +256,39 @@ class cSAXSNeXusFormat(DefaultFormat): # "description", # data=( # "Grazing incidence mirror to reject high-harmonic wavelengths. " - # "Three coating options depending on X-ray energy: " - # "no coating (SiO2), rhodium (Rh) or platinum (Pt)." + # "Three coating options: no coating (SiO2), rhodium (Rh), platinum (Pt)." # ), # ) # mirror.create_dataset("substrate_material", data="SiO2") - # self._safe_dataset(mirror, "incident_angle", "mith", units="degrees") - # self._safe_dataset(mirror, "coating_material", "mirror_coating", units="NX_CHAR") - # self._safe_dataset(mirror, "bend_y", "mibd", units="NX_DIMENSIONLESS") + # self._safe_dataset(mirror, "incident_angle", "mith", units="degrees") + # self._safe_dataset(mirror, "coating_material", "mirror_coating", units="NX_CHAR") + # self._safe_dataset(mirror, "bend_y", "mibd", units="NX_DIMENSIONLESS") - # ── Upstream slits (optics hutch) ──────────────────────────────────── - # TODO: slit_0 / slit_1 / slit_2 devices not yet in device list + # ── Upstream slits (optics hutch) ───────────────────────────────────── + # TODO: slit_0 / slit_1 / slit_2 motors not yet in BEC device list # slit_0 = instrument.create_group("slit_0") - # slit_0.attrs["NX_class"] = "NXslit" - # slit_0.create_dataset("material", data="OFHC Cu") - # slit_0.create_dataset("description", data="Horizontal secondary source slit") - # self._safe_dataset(slit_0, "x_gap", "sl0wh", units="mm") - # self._safe_dataset(slit_0, "x_translation", "sl0ch", units="mm") - # + # ... # slit_1 = instrument.create_group("slit_1") - # slit_1.attrs["NX_class"] = "NXslit" - # slit_1.create_dataset("material", data="OFHC Cu") - # slit_1.create_dataset("description", data="Slit 1, optics hutch") - # self._safe_dataset(slit_1, "x_gap", "sl1wh", units="mm") - # self._safe_dataset(slit_1, "y_gap", "sl1wv", units="mm") - # self._safe_dataset(slit_1, "x_translation", "sl1ch", units="mm") - # self._safe_dataset(slit_1, "height", "sl1cv", units="mm") - # + # ... # slit_2 = instrument.create_group("slit_2") - # slit_2.attrs["NX_class"] = "NXslit" - # slit_2.create_dataset("material", data="Ag") - # slit_2.create_dataset("description", data="Slit 2, optics hutch") - # self._safe_dataset(slit_2, "x_gap", "sl2wh", units="mm") - # self._safe_dataset(slit_2, "y_gap", "sl2wv", units="mm") - # self._safe_dataset(slit_2, "x_translation", "sl2ch", units="mm") - # self._safe_dataset(slit_2, "height", "sl2cv", units="mm") + # ... - # ── XBPM3 (experimental hutch beam position monitor) ───────────────── - # Note: xbpm3x/xbpm3y are the *stage* motor positions used to align - # the monitor, not the signal readout channels (sum/x/y/skew). - # Signal readouts are TODO once MCS channels are mapped. + # ── XBPM3 ───────────────────────────────────────────────────────────── + # xbpm3x/xbpm3y are stage motor positions for aligning the monitor. + # Signal readouts (sum/x/y/skew) are TODO once MCS channels are mapped. xbpm3 = instrument.create_group("XBPM3") - xbpm3.attrs["NX_class"] = "NXdetector" + xbpm3.attrs["NX_class"] = "NXdetector" xbpm3.attrs["description"] = "X-ray beam position monitor 3, experimental hutch" self._safe_dataset(xbpm3, "x_stage", "xbpm3x", units="mm", description="XBPM3 stage x-translation") self._safe_dataset(xbpm3, "y_stage", "xbpm3y", units="mm", description="XBPM3 stage y-translation") - # TODO: add signal readout sub-groups once MCS channels are configured + # TODO: signal readout sub-groups once MCS channels are configured # for suffix, entry_name, desc in [ # ("sum", "bpm3s", "Sum of counts for the four quadrants."), - # ("x", "bpm3x", "Normalized difference of counts between left and right quadrants."), - # ("y", "bpm3y", "Normalized difference of counts between high and low quadrants."), - # ("skew", "bpm3z", "Normalized difference of counts between diagonal quadrants."), + # ("x", "bpm3x", "Normalized diff, left vs right quadrants."), + # ("y", "bpm3y", "Normalized diff, high vs low quadrants."), + # ("skew", "bpm3z", "Normalized diff, diagonal quadrants."), # ]: # g = xbpm3.create_group(f"XBPM3_{suffix}") # self._safe_dataset(g, "data", entry_name, units="NX_DIMENSIONLESS") @@ -306,13 +299,10 @@ class cSAXSNeXusFormat(DefaultFormat): slit_3.attrs["NX_class"] = "NXslit" slit_3.create_dataset("material", data="Si") slit_3.create_dataset("description", data="Slit 3, experimental hutch, exposure box") - slit_3.attrs["blade_note"] = ( - "Individual blade positions stored. " - "x_gap = outer_x - inner_x, y_gap = top_y - bottom_y" - ) + # TODO: gap / centre require per-slit calibration offset — add later self._slit_blades(slit_3, "sl3") - # ── Filter set (attenuator) ─────────────────────────────────────────── + # ── Filter set ──────────────────────────────────────────────────────── filter_set = instrument.create_group("filter_set") filter_set.attrs["NX_class"] = "NXattenuator" filter_set.create_dataset("material", data="Si") @@ -324,21 +314,15 @@ class cSAXSNeXusFormat(DefaultFormat): ), ) for i in range(1, 5): - self._safe_dataset(filter_set, f"stage_{i}_x", f"filter_array_{i}_x", units="mm") - # TODO: attenuator_transmission — requires a computed/mapped signal: - # attenuator_transmission = 10 ^ (ftrans) - # self._safe_dataset(filter_set, "attenuator_transmission", "ftrans", - # units="NX_DIMENSIONLESS") + self._safe_dataset(filter_set, f"stage_{i}_x", + f"filter_array_{i}_x", units="mm") + # TODO: attenuator_transmission = 10^(ftrans) once device is available # ── Slit 4 (experimental hutch, exposure box) ───────────────────────── slit_4 = instrument.create_group("slit_4") slit_4.attrs["NX_class"] = "NXslit" slit_4.create_dataset("material", data="Ge") slit_4.create_dataset("description", data="Slit 4, experimental hutch, exposure box") - slit_4.attrs["blade_note"] = ( - "Individual blade positions stored. " - "x_gap = outer_x - inner_x, y_gap = top_y - bottom_y" - ) self._slit_blades(slit_4, "sl4") # ── Slit 5 (experimental hutch, exposure box) ───────────────────────── @@ -346,10 +330,6 @@ class cSAXSNeXusFormat(DefaultFormat): slit_5.attrs["NX_class"] = "NXslit" slit_5.create_dataset("material", data="Si") slit_5.create_dataset("description", data="Slit 5, experimental hutch, exposure box") - slit_5.attrs["blade_note"] = ( - "Individual blade positions stored. " - "x_gap = outer_x - inner_x, y_gap = top_y - bottom_y" - ) self._slit_blades(slit_5, "sl5") # ── Beam stop 1 ──────────────────────────────────────────────────────── @@ -359,27 +339,25 @@ class cSAXSNeXusFormat(DefaultFormat): beam_stop_1.create_dataset("size", data=3.0).attrs["units"] = "mm" self._safe_dataset(beam_stop_1, "x", "bs1x", units="mm") self._safe_dataset(beam_stop_1, "y", "bs1y", units="mm") - # TODO: diode signal behind beam stop 1 once device is available - # self._safe_dataset(beam_stop_1, "data", "diode1", units="NX_DIMENSIONLESS") + # TODO: diode signal behind beam stop 1 when device is available # ── Beam stop 2 ──────────────────────────────────────────────────────── beam_stop_2 = instrument.create_group("beam_stop_2") beam_stop_2.attrs["NX_class"] = "NXbeam_stop" beam_stop_2.create_dataset("description", data="rectangular") - beam_stop_2.create_dataset("size_x", data=5.0).attrs["units"] = "mm" - beam_stop_2.create_dataset("size_y", data=2.25).attrs["units"] = "mm" + beam_stop_2.create_dataset("size_x", data=5.0).attrs["units"] = "mm" + beam_stop_2.create_dataset("size_y", data=2.25).attrs["units"] = "mm" self._safe_dataset(beam_stop_2, "x", "bs2x", units="mm") self._safe_dataset(beam_stop_2, "y", "bs2y", units="mm") - # TODO: diode (scattered/transmitted signal) once device is available - # self._safe_dataset(beam_stop_2, "data", "diode", units="NX_DIMENSIONLESS") + # TODO: diode (transmitted signal) when device is available - # ── Detector translation ────────────────────────────────────────────── + # ── Detector translation ─────────────────────────────────────────────── self._safe_dataset( instrument, "detector_translation_x", "dettrx", - units="mm", description="Detector x-translation stage" + units="mm", description="Detector x-translation stage", ) - # ── Eiger 1.5M detector ──────────────────────────────────────────────── + # ── Eiger 1.5M detector ─────────────────────────────────────────────── if ( "eiger_1_5" in self.device_manager.devices and self.device_manager.devices.eiger_1_5.enabled @@ -387,11 +365,11 @@ class cSAXSNeXusFormat(DefaultFormat): ): eiger = instrument.create_group("eiger_1_5") eiger.attrs["NX_class"] = "NXdetector" - eiger.create_dataset("x_pixel_size", data=75.0).attrs["units"] = "um" - eiger.create_dataset("y_pixel_size", data=75.0).attrs["units"] = "um" - eiger.create_dataset("polar_angle", data=0.0).attrs["units"] = "degrees" - eiger.create_dataset("azimuthal_angle", data=0.0).attrs["units"] = "degrees" - eiger.create_dataset("rotation_angle", data=0.0).attrs["units"] = "degrees" + eiger.create_dataset("x_pixel_size", data=75.0).attrs["units"] = "um" + eiger.create_dataset("y_pixel_size", data=75.0).attrs["units"] = "um" + eiger.create_dataset("polar_angle", data=0.0).attrs["units"] = "degrees" + eiger.create_dataset("azimuthal_angle", data=0.0).attrs["units"] = "degrees" + eiger.create_dataset("rotation_angle", data=0.0).attrs["units"] = "degrees" eiger.create_dataset( "description", data="Eiger 1.5M detector, in-house developed, Paul Scherrer Institute", @@ -404,21 +382,17 @@ class cSAXSNeXusFormat(DefaultFormat): orientation.attrs["description"] = ( "Orientation defines the number of counterclockwise rotations by 90 deg " "followed by a transposition to reach the 'cameraman orientation', " - "that is looking towards the beam." + "looking towards the beam." ) orientation.create_dataset("transpose", data=1) orientation.create_dataset("rot90", data=3) - # Soft-link the Eiger data frames recorded in the collection + # Soft-link recorded frame data from the BEC collection try: for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys(): - eiger.create_soft_link( - k, f"/entry/collection/file_references/eiger_1_5/{k}" - ) + self._safe_soft_link(eiger, k, f"{EIGER_COLL}/{k}") except Exception: pass - - # External link to the pixel mask stored in the Eiger master file - # Path inside master: /entry/instrument/detector/pixel_mask + # External link to pixel mask in the Eiger master file try: eiger.create_ext_link( "pixel_mask", @@ -429,564 +403,68 @@ class cSAXSNeXusFormat(DefaultFormat): pass # ── MCS (multi-channel scaler) ───────────────────────────────────────── - if "mcs" in self.device_manager.devices and self.device_manager.devices.mcs.enabled: + if ( + "mcs" in self.device_manager.devices + and self.device_manager.devices.mcs.enabled + ): mcs_group = instrument.create_group("mcs") - mcs_group.attrs["NX_class"] = "NXdetector" + mcs_group.attrs["NX_class"] = "NXdetector" mcs_group.attrs["description"] = "MCS card cSAXS — multi-channel scaler" - try: - mcs_group.create_soft_link("data", "/entry/collection/devices/mcs") - except Exception: - pass + self._safe_soft_link(mcs_group, "data", "/entry/collection/devices/mcs") - # ── Flomni sample / nano-positioning stage ───────────────────────────── - flomni = instrument.create_group("flomni_stage") - flomni.attrs["NX_class"] = "NXpositioner" - flomni.attrs["description"] = "Flomni nano-positioning sample stage" - self._safe_dataset(flomni, "x", "fsamx", units="mm") - self._safe_dataset(flomni, "y", "fsamy", units="mm") - self._safe_dataset(flomni, "rot_y", "fsamroy", units="degrees") - # Additional Flomni axes (coarse + optics translations) — stored if scanned - self._safe_dataset(flomni, "transfer_x", "ftransx", units="mm") - self._safe_dataset(flomni, "transfer_y", "ftransy", units="mm") - self._safe_dataset(flomni, "transfer_z", "ftransz", units="mm") - self._safe_dataset(flomni, "tray", "ftray", units="mm") - self._safe_dataset(flomni, "eye_x", "feyex", units="mm") - self._safe_dataset(flomni, "eye_y", "feyey", units="mm") - self._safe_dataset(flomni, "opt_x", "foptx", units="mm") - self._safe_dataset(flomni, "opt_y", "fopty", units="mm") - self._safe_dataset(flomni, "opt_z", "foptz", units="mm") - self._safe_dataset(flomni, "track_y", "ftracky", units="mm") - self._safe_dataset(flomni, "track_z", "ftrackz", units="mm") + # ── flOMNI ──────────────────────────────────────────────────────────── + # flomni_samples is used as the sentinel for the entire flOMNI setup. + # If it is absent from the device_manager (removed from config) the + # whole group is omitted. Individual datasets inside are still each + # guarded by _safe_dataset / _safe_soft_link in case a specific motor + # is temporarily disabled without removing the full setup. + if "flomni_samples" in self.device_manager.devices: + flomni = instrument.create_group("flOMNI") + flomni.attrs["NX_class"] = "NXpositioner" + flomni.attrs["description"] = "flOMNI flexible tOMography Nano Imaging" - # ── Real-time encoder positions (rt_positions) ───────────────────────── - # A single soft link exposes the entire rt_positions folder recorded by - # RtFlomniFlyer at the canonical instrument path so reconstruction codes - # always find it at /entry/instrument/flomni_stage/rt_positions. - try: - flomni.create_soft_link( - "rt_positions", "/entry/collection/devices/rt_positions" + # Galil motors — coarse sample stage + self._safe_dataset(flomni, "fsamx", "fsamx", units="mm", description="Sample coarse X") + self._safe_dataset(flomni, "fsamy", "fsamy", units="mm", description="Sample coarse Y") + self._safe_dataset(flomni, "fsamroy", "fsamroy", units="degrees", description="Sample rotation") + + # Galil motors — sample transfer / tray + self._safe_dataset(flomni, "ftransx", "ftransx", units="mm", description="Sample transfer X") + self._safe_dataset(flomni, "ftransy", "ftransy", units="mm", description="Sample transfer Y") + self._safe_dataset(flomni, "ftransz", "ftransz", units="mm", description="Sample transfer Z") + self._safe_dataset(flomni, "ftray", "ftray", units="mm", description="Sample transfer tray") + + # Galil motors — laser tracker + self._safe_dataset(flomni, "ftracky", "ftracky", units="mm", description="Laser tracker coarse Y") + self._safe_dataset(flomni, "ftrackz", "ftrackz", units="mm", description="Laser tracker coarse Z") + + # Galil motors — X-ray eye + self._safe_dataset(flomni, "feyex", "feyex", units="mm", description="X-ray eye X") + self._safe_dataset(flomni, "feyey", "feyey", units="mm", description="X-ray eye Y") + + # Galil motors — optics (zone plate) + self._safe_dataset(flomni, "foptx", "foptx", units="mm", description="Optics X") + self._safe_dataset(flomni, "fopty", "fopty", units="mm", description="Optics Y") + self._safe_dataset(flomni, "foptz", "foptz", units="mm", description="Optics Z") + + # Galil motor — heater + self._safe_dataset(flomni, "fheater", "fheater", units="mm", description="Heater Y") + + # Smaract motors — OSA (order-sorting aperture) + self._safe_dataset(flomni, "fosax", "fosax", units="mm", description="OSA X") + self._safe_dataset(flomni, "fosay", "fosay", units="mm", description="OSA Y") + self._safe_dataset(flomni, "fosaz", "fosaz", units="mm", description="OSA Z") + + # Temperature and humidity sensor (soft link to BEC collection entry) + self._safe_soft_link( + flomni, "flomni_temphum", + "/entry/collection/devices/flomni_temphum", ) - except Exception: - pass - - -# from __future__ import annotations - -# import numpy as np -# from bec_server.file_writer.default_writer import DefaultFormat - - -# class cSAXSNeXusFormat(DefaultFormat): -# """ -# NeXus file format for cSAXS beamline. This format is based on the default NeXus format, but with some additional entries specific to the cSAXS beamline. The structure of the file is based on the NeXus standard, but with some additional groups and datasets specific to the cSAXS beamline. -# """ - -# def format(self) -> None: -# """ -# Prepare the NeXus file format. -# Override this method in file writer plugins to customize the HDF5 file format. - -# The class provides access to the following attributes: -# - self.storage: The HDF5Storage object. -# - self.data: The data dictionary. -# - self.file_references: The file references dictionary, which has the link to external data. -# - self.device_manager: The DeviceManagerBase object. -# - self.get_entry(name, default=None): Helper method to get an entry from the data dictionary. - -# See also: :class:`bec_server.file_writer.file_writer.HDF5Storage`. - -# """ -# entry = self.storage.create_group("entry") -# # ------------------------------------------------------------------ -# # Create ptychography entry -# # ------------------------------------------------------------------ -# entry_ptycho = entry.create_group("entry_ptycho") -# entry_ptycho.attrs["NX_class"] = "NXentry" -# entry_ptycho.attrs["definition"] = "NXptycho" - -# # ------------------------------------------------------------------ -# # NXdata group -# # ------------------------------------------------------------------ -# nxdata = entry_ptycho.create_group("data") -# nxdata.attrs["NX_class"] = "NXdata" -# nxdata.attrs["signal"] = "data" - -# # ------------------------------------------------------------------ -# # Link Eiger data (external) -# # ------------------------------------------------------------------ -# data = entry.create_group("data") -# data.attrs["NX_class"] = "NXentry" -# # Either ext link! -# #data.create_ext_link("data", self.file_references["eiger_1_5"].file_path, "entry/data") -# # Or soft link to already linked path -# data_soft = entry.create_group("data_soft") -# data_soft.attrs["NX_class"] = "NXentry" -# for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys(): -# data_soft.create_soft_link(k, f"/entry/collection/file_references/eiger_1_5/{k}") - -# # ------------------------------------------------------------------ -# # Link positions (soft links) -# # ------------------------------------------------------------------ -# positions_path = "/entry/collection/devices/rt_positions" - -# try: -# nxdata.create_soft_link("positions", positions_path) -# except Exception: -# pass - -# # ------------------------------------------------------------------ -# # Sample group -# # ------------------------------------------------------------------ -# sample = entry_ptycho.create_group("sample") -# sample.attrs["NX_class"] = "NXsample" - -# try: -# sample.create_soft_link("positions", positions_path) -# except Exception: -# pass -# # entry = self.storage.create_group("entry") - -# # # /entry/control -# # control = entry.create_group("control") -# # control.attrs["NX_class"] = "NXmonitor" -# # control.create_dataset(name="mode", data="monitor") - -# # ######### -# # # EXAMPLE for soft link -# # ######### -# # # /entry/data -# # if "eiger_4" in self.device_manager.devices: -# # entry.create_soft_link(name="data", target="/entry/instrument/eiger_4") - -# # ######## -# # # EXAMPLE for external link -# # ######## -# # # control = entry.create_group("sample") -# # # control.create_ext_link("data", self.file_references["eiger9m"]["path"], "EG9M/data") - -# # # /entry/sample -# # control = entry.create_group("sample") -# # control.attrs["NX_class"] = "NXsample" -# # control.create_dataset(name="name", data=self.data.get("samplename")) -# # control.create_dataset(name="description", data=self.data.get("sample_description")) - -# # # /entry/instrument -# # instrument = entry.create_group("instrument") -# # instrument.attrs["NX_class"] = "NXinstrument" - -# # source = instrument.create_group("source") -# # source.attrs["NX_class"] = "NXsource" -# # source.create_dataset(name="type", data="Synchrotron X-ray Source") -# # source.create_dataset(name="name", data="Swiss Light Source") -# # source.create_dataset(name="probe", data="x-ray") - -# # # /entry -# # entry = self.storage.create_group("entry") -# # entry.attrs["NX_class"] = "NXentry" -# # entry.attrs["definition"] = "NXsas" -# # entry.attrs["start_time"] = self.data.get("start_time") -# # entry.attrs["end_time"] = self.data.get("end_time") -# # entry.attrs["version"] = 1.0 - -# # # /entry/control -# # control = entry.create_group("control") -# # control.attrs["NX_class"] = "NXmonitor" -# # control.create_dataset(name="mode", data="monitor") -# # control.create_dataset(name="integral", data=self.get_entry("bpm4i")) - -# # # /entry/data -# # main_data = entry.create_group("data") -# # main_data.attrs["NX_class"] = "NXdata" -# # if "eiger_4" in self.device_manager.devices: -# # main_data.create_soft_link(name="data", target="/entry/instrument/eiger_4/data") -# # elif "eiger9m" in self.device_manager.devices: -# # main_data.create_soft_link(name="data", target="/entry/instrument/eiger9m/data") -# # elif "pilatus_2" in self.device_manager.devices: -# # main_data.create_soft_link(name="data", target="/entry/instrument/pilatus_2/data") - -# # # /entry/sample -# # control = entry.create_group("sample") -# # control.attrs["NX_class"] = "NXsample" -# # control.create_dataset(name="name", data=self.get_entry("samplename")) -# # control.create_dataset(name="description", data=self.data.get("sample_description")) -# # x_translation = control.create_dataset(name="x_translation", data=self.get_entry("samx")) -# # x_translation.attrs["units"] = "mm" -# # y_translation = control.create_dataset(name="y_translation", data=self.get_entry("samy")) -# # y_translation.attrs["units"] = "mm" -# # temperature_log = control.create_dataset( -# # name="temperature_log", data=self.get_entry("temp") -# # ) -# # temperature_log.attrs["units"] = "K" - -# # # /entry/instrument -# # instrument = entry.create_group("instrument") -# # instrument.attrs["NX_class"] = "NXinstrument" -# # instrument.create_dataset(name="name", data="cSAXS beamline") - -# # source = instrument.create_group("source") -# # source.attrs["NX_class"] = "NXsource" -# # source.create_dataset(name="type", data="Synchrotron X-ray Source") -# # source.create_dataset(name="name", data="Swiss Light Source") -# # source.create_dataset(name="probe", data="x-ray") -# # distance = source.create_dataset( -# # name="distance", data=-33800 - np.asarray(self.get_entry("samz", 0)) -# # ) -# # distance.attrs["units"] = "mm" -# # sigma_x = source.create_dataset(name="sigma_x", data=0.202) -# # sigma_x.attrs["units"] = "mm" -# # sigma_y = source.create_dataset(name="sigma_y", data=0.018) -# # sigma_y.attrs["units"] = "mm" -# # divergence_x = source.create_dataset(name="divergence_x", data=0.000135) -# # divergence_x.attrs["units"] = "radians" -# # divergence_y = source.create_dataset(name="divergence_y", data=0.000025) -# # divergence_y.attrs["units"] = "radians" -# # current = source.create_dataset(name="current", data=self.get_entry("curr")) -# # current.attrs["units"] = "mA" - -# # insertion_device = instrument.create_group("insertion_device") -# # insertion_device.attrs["NX_class"] = "NXinsertion_device" -# # source.create_dataset(name="type", data="undulator") -# # gap = source.create_dataset(name="gap", data=self.get_entry("idgap")) -# # gap.attrs["units"] = "mm" -# # k = source.create_dataset(name="k", data=2.46) -# # k.attrs["units"] = "NX_DIMENSIONLESS" -# # length = source.create_dataset(name="length", data=1820) -# # length.attrs["units"] = "mm" - -# # slit_0 = instrument.create_group("slit_0") -# # slit_0.attrs["NX_class"] = "NXslit" -# # source.create_dataset(name="material", data="OFHC Cu") -# # source.create_dataset(name="description", data="Horizontal secondary source slit") -# # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl0wh")) -# # x_gap.attrs["units"] = "mm" -# # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl0ch")) -# # x_translation.attrs["units"] = "mm" -# # distance = source.create_dataset( -# # name="distance", data=-21700 - np.asarray(self.get_entry("samz", 0)) -# # ) -# # distance.attrs["units"] = "mm" - -# # slit_1 = instrument.create_group("slit_1") -# # slit_1.attrs["NX_class"] = "NXslit" -# # source.create_dataset(name="material", data="OFHC Cu") -# # source.create_dataset(name="description", data="Horizontal secondary source slit") -# # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl1wh")) -# # x_gap.attrs["units"] = "mm" -# # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl1wv")) -# # y_gap.attrs["units"] = "mm" -# # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl1ch")) -# # x_translation.attrs["units"] = "mm" -# # height = source.create_dataset(name="x_translation", data=self.get_entry("sl1ch")) -# # height.attrs["units"] = "mm" -# # distance = source.create_dataset( -# # name="distance", data=-7800 - np.asarray(self.get_entry("samz", 0)) -# # ) -# # distance.attrs["units"] = "mm" - -# # mono = instrument.create_group("monochromator") -# # mono.attrs["NX_class"] = "NXmonochromator" -# # mokev = self.data.get("mokev", {}) -# # if mokev: -# # if isinstance(mokev, list): -# # mokev = mokev[0] -# # wavelength = mono.create_dataset( -# # name="wavelength", data=12.3984193 / (mokev.get("mokev").get("value") + 1e-9) -# # ) -# # wavelength.attrs["units"] = "Angstrom" -# # energy = mono.create_dataset(name="energy", data=mokev.get("mokev").get("value")) -# # energy.attrs["units"] = "keV" -# # mono.create_dataset(name="type", data="Double crystal fixed exit monochromator.") -# # distance = mono.create_dataset( -# # name="distance", data=-5220 - np.asarray(self.get_entry("samz", 0)) -# # ) -# # distance.attrs["units"] = "mm" - -# # crystal_1 = mono.create_group("crystal_1") -# # crystal_1.attrs["NX_class"] = "NXcrystal" -# # crystal_1.create_dataset(name="usage", data="Bragg") -# # crystal_1.create_dataset(name="order_no", data="1") -# # crystal_1.create_dataset(name="reflection", data="[1 1 1]") -# # bragg_angle = crystal_1.create_dataset(name="bragg_angle", data=self.get_entry("moth1")) -# # bragg_angle.attrs["units"] = "degrees" - -# # crystal_2 = mono.create_group("crystal_2") -# # crystal_2.attrs["NX_class"] = "NXcrystal" -# # crystal_2.create_dataset(name="usage", data="Bragg") -# # crystal_2.create_dataset(name="order_no", data="2") -# # crystal_2.create_dataset(name="reflection", data="[1 1 1]") -# # bragg_angle = crystal_2.create_dataset(name="bragg_angle", data=self.get_entry("moth1")) -# # bragg_angle.attrs["units"] = "degrees" -# # bend_x = crystal_2.create_dataset(name="bend_x", data=self.get_entry("mobd")) -# # bend_x.attrs["units"] = "degrees" - -# # xbpm4 = instrument.create_group("XBPM4") -# # xbpm4.attrs["NX_class"] = "NXdetector" -# # xbpm4_sum = xbpm4.create_group("XBPM4_sum") -# # xbpm4_sum_data = xbpm4_sum.create_dataset(name="data", data=self.get_entry("bpm4s")) -# # xbpm4_sum_data.attrs["units"] = "NX_DIMENSIONLESS" -# # xbpm4_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.") -# # xbpm4_x = xbpm4.create_group("XBPM4_x") -# # xbpm4_x_data = xbpm4_x.create_dataset(name="data", data=self.get_entry("bpm4x")) -# # xbpm4_x_data.attrs["units"] = "NX_DIMENSIONLESS" -# # xbpm4_x.create_dataset( -# # name="description", -# # data="Normalized difference of counts between left and right quadrants.", -# # ) -# # xbpm4_y = xbpm4.create_group("XBPM4_y") -# # xbpm4_y_data = xbpm4_y.create_dataset(name="data", data=self.get_entry("bpm4y")) -# # xbpm4_y_data.attrs["units"] = "NX_DIMENSIONLESS" -# # xbpm4_y.create_dataset( -# # name="description", -# # data="Normalized difference of counts between high and low quadrants.", -# # ) -# # xbpm4_skew = xbpm4.create_group("XBPM4_skew") -# # xbpm4_skew_data = xbpm4_skew.create_dataset(name="data", data=self.get_entry("bpm4z")) -# # xbpm4_skew_data.attrs["units"] = "NX_DIMENSIONLESS" -# # xbpm4_skew.create_dataset( -# # name="description", data="Normalized difference of counts between diagonal quadrants." -# # ) - -# # mirror = instrument.create_group("mirror") -# # mirror.attrs["NX_class"] = "NXmirror" -# # mirror.create_dataset(name="type", data="single") -# # mirror.create_dataset( -# # name="description", -# # data="Grazing incidence mirror to reject high-harmonic wavelengths from the monochromator. There are three coating options available that are used depending on the X-ray energy, no coating (SiO2), rhodium (Rh) or platinum (Pt).", -# # ) -# # incident_angle = mirror.create_dataset(name="incident_angle", data=self.get_entry("mith")) -# # incident_angle.attrs["units"] = "degrees" -# # substrate_material = mirror.create_dataset(name="substrate_material", data="SiO2") -# # substrate_material.attrs["units"] = "NX_CHAR" -# # coating_material = mirror.create_dataset(name="coating_material", data="SiO2") -# # coating_material.attrs["units"] = "NX_CHAR" -# # bend_y = mirror.create_dataset(name="bend_y", data="mibd") -# # bend_y.attrs["units"] = "NX_DIMENSIONLESS" -# # distance = mirror.create_dataset( -# # name="distance", data=-4370 - np.asarray(self.get_entry("samz", 0)) -# # ) -# # distance.attrs["units"] = "mm" - -# # xbpm5 = instrument.create_group("XBPM5") -# # xbpm5.attrs["NX_class"] = "NXdetector" -# # xbpm5_sum = xbpm5.create_group("XBPM5_sum") -# # xbpm5_sum_data = xbpm5_sum.create_dataset(name="data", data=self.get_entry("bpm5s")) -# # xbpm5_sum_data.attrs["units"] = "NX_DIMENSIONLESS" -# # xbpm5_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.") -# # xbpm5_x = xbpm5.create_group("XBPM5_x") -# # xbpm5_x_data = xbpm5_x.create_dataset(name="data", data=self.get_entry("bpm5x")) -# # xbpm5_x_data.attrs["units"] = "NX_DIMENSIONLESS" -# # xbpm5_x.create_dataset( -# # name="description", -# # data="Normalized difference of counts between left and right quadrants.", -# # ) -# # xbpm5_y = xbpm5.create_group("XBPM5_y") -# # xbpm5_y_data = xbpm5_y.create_dataset(name="data", data=self.get_entry("bpm5y")) -# # xbpm5_y_data.attrs["units"] = "NX_DIMENSIONLESS" -# # xbpm5_y.create_dataset( -# # name="description", -# # data="Normalized difference of counts between high and low quadrants.", -# # ) -# # xbpm5_skew = xbpm5.create_group("XBPM5_skew") -# # xbpm5_skew_data = xbpm5_skew.create_dataset(name="data", data=self.get_entry("bpm5z")) -# # xbpm5_skew_data.attrs["units"] = "NX_DIMENSIONLESS" -# # xbpm5_skew.create_dataset( -# # name="description", data="Normalized difference of counts between diagonal quadrants." -# # ) - -# # slit_2 = instrument.create_group("slit_2") -# # slit_2.attrs["NX_class"] = "NXslit" -# # source.create_dataset(name="material", data="Ag") -# # source.create_dataset(name="description", data="Slit 2, optics hutch") -# # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl2wh")) -# # x_gap.attrs["units"] = "mm" -# # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl2wv")) -# # y_gap.attrs["units"] = "mm" -# # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl2ch")) -# # x_translation.attrs["units"] = "mm" -# # height = source.create_dataset(name="x_translation", data=self.get_entry("sl2cv")) -# # height.attrs["units"] = "mm" -# # distance = source.create_dataset( -# # name="distance", data=-3140 - np.asarray(self.get_entry("samz", 0)) -# # ) -# # distance.attrs["units"] = "mm" - -# # slit_3 = instrument.create_group("slit_3") -# # slit_3.attrs["NX_class"] = "NXslit" -# # source.create_dataset(name="material", data="Si") -# # source.create_dataset(name="description", data="Slit 3, experimental hutch, exposure box") -# # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl3wh")) -# # x_gap.attrs["units"] = "mm" -# # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl3wv")) -# # y_gap.attrs["units"] = "mm" -# # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl3ch")) -# # x_translation.attrs["units"] = "mm" -# # height = source.create_dataset(name="x_translation", data=self.get_entry("sl3cv")) -# # height.attrs["units"] = "mm" -# # # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0)) -# # # distance.attrs["units"] = "mm" - -# # filter_set = instrument.create_group("filter_set") -# # filter_set.attrs["NX_class"] = "NXattenuator" -# # filter_set.create_dataset(name="material", data="Si") -# # filter_set.create_dataset( -# # name="description", -# # data="The filter set consists of 4 linear stages, each with five filter positions. Additionally, each one allows for an out position to allow 'no filtering'.", -# # ) -# # attenuator_transmission = filter_set.create_dataset( -# # name="attenuator_transmission", data=10 ** self.get_entry("ftrans", 0) -# # ) -# # attenuator_transmission.attrs["units"] = "NX_DIMENSIONLESS" - -# # slit_4 = instrument.create_group("slit_4") -# # slit_4.attrs["NX_class"] = "NXslit" -# # source.create_dataset(name="material", data="Si") -# # source.create_dataset(name="description", data="Slit 4, experimental hutch, exposure box") -# # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl4wh")) -# # x_gap.attrs["units"] = "mm" -# # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl4wv")) -# # y_gap.attrs["units"] = "mm" -# # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl4ch")) -# # x_translation.attrs["units"] = "mm" -# # height = source.create_dataset(name="x_translation", data=self.get_entry("sl4cv")) -# # height.attrs["units"] = "mm" -# # # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0)) -# # # distance.attrs["units"] = "mm" - -# # slit_5 = instrument.create_group("slit_5") -# # slit_5.attrs["NX_class"] = "NXslit" -# # source.create_dataset(name="material", data="Si") -# # source.create_dataset(name="description", data="Slit 5, experimental hutch, exposure box") -# # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl5wh")) -# # x_gap.attrs["units"] = "mm" -# # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl5wv")) -# # y_gap.attrs["units"] = "mm" -# # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl5ch")) -# # x_translation.attrs["units"] = "mm" -# # height = source.create_dataset(name="x_translation", data=self.get_entry("sl5cv")) -# # height.attrs["units"] = "mm" -# # # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0)) -# # # distance.attrs["units"] = "mm" - -# # beam_stop_1 = instrument.create_group("beam_stop_1") -# # beam_stop_1.attrs["NX_class"] = "NX_beamstop" -# # beam_stop_1.create_dataset(name="description", data="circular") -# # bms1_size = beam_stop_1.create_dataset(name="size", data=3) -# # bms1_size.attrs["units"] = "mm" -# # bms1_x = beam_stop_1.create_dataset(name="size", data=self.get_entry("bs1x")) -# # bms1_x.attrs["units"] = "mm" -# # bms1_y = beam_stop_1.create_dataset(name="size", data=self.get_entry("bs1y")) -# # bms1_y.attrs["units"] = "mm" - -# # beam_stop_2 = instrument.create_group("beam_stop_2") -# # beam_stop_2.attrs["NX_class"] = "NX_beamstop" -# # beam_stop_2.create_dataset(name="description", data="rectangular") -# # bms2_size_x = beam_stop_2.create_dataset(name="size_x", data=5) -# # bms2_size_x.attrs["units"] = "mm" -# # bms2_size_y = beam_stop_2.create_dataset(name="size_y", data=2.25) -# # bms2_size_y.attrs["units"] = "mm" -# # bms2_x = beam_stop_2.create_dataset(name="size", data=self.get_entry("bs2x")) -# # bms2_x.attrs["units"] = "mm" -# # bms2_y = beam_stop_2.create_dataset(name="size", data=self.get_entry("bs2y")) -# # bms2_y.attrs["units"] = "mm" -# # bms2_data = beam_stop_2.create_dataset(name="data", data=self.get_entry("diode")) -# # bms2_data.attrs["units"] = "NX_DIMENSIONLESS" - -# # if ( -# # "eiger1p5m" in self.device_manager.devices -# # and self.device_manager.devices.eiger1p5m.enabled -# # ): -# # eiger_4 = instrument.create_group("eiger_4") -# # eiger_4.attrs["NX_class"] = "NXdetector" -# # x_pixel_size = eiger_4.create_dataset(name="x_pixel_size", data=75) -# # x_pixel_size.attrs["units"] = "um" -# # y_pixel_size = eiger_4.create_dataset(name="y_pixel_size", data=75) -# # y_pixel_size.attrs["units"] = "um" -# # polar_angle = eiger_4.create_dataset(name="polar_angle", data=0) -# # polar_angle.attrs["units"] = "degrees" -# # azimuthal_angle = eiger_4.create_dataset(name="azimuthal_angle", data=0) -# # azimuthal_angle.attrs["units"] = "degrees" -# # rotation_angle = eiger_4.create_dataset(name="rotation_angle", data=0) -# # rotation_angle.attrs["units"] = "degrees" -# # description = eiger_4.create_dataset( -# # name="description", data="Single-photon counting detector, 320 micron-thick Si chip" -# # ) -# # orientation = eiger_4.create_group("orientation") -# # orientation.attrs["description"] = ( -# # "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam." -# # ) -# # orientation.create_dataset(name="transpose", data=1) -# # orientation.create_dataset(name="rot90", data=3) - -# # if ( -# # "eiger9m" in self.device_manager.devices -# # and self.device_manager.devices.eiger9m.enabled -# # and "eiger9m" in self.file_references -# # ): -# # eiger9m = instrument.create_group("eiger9m") -# # eiger9m.attrs["NX_class"] = "NXdetector" -# # x_pixel_size = eiger9m.create_dataset(name="x_pixel_size", data=75) -# # x_pixel_size.attrs["units"] = "um" -# # y_pixel_size = eiger9m.create_dataset(name="y_pixel_size", data=75) -# # y_pixel_size.attrs["units"] = "um" -# # polar_angle = eiger9m.create_dataset(name="polar_angle", data=0) -# # polar_angle.attrs["units"] = "degrees" -# # azimuthal_angle = eiger9m.create_dataset(name="azimuthal_angle", data=0) -# # azimuthal_angle.attrs["units"] = "degrees" -# # rotation_angle = eiger9m.create_dataset(name="rotation_angle", data=0) -# # rotation_angle.attrs["units"] = "degrees" -# # description = eiger9m.create_dataset( -# # name="description", -# # data="Eiger9M detector, in-house developed, Paul Scherrer Institute", -# # ) -# # orientation = eiger9m.create_group("orientation") -# # orientation.attrs["description"] = ( -# # "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam." -# # ) -# # orientation.create_dataset(name="transpose", data=1) -# # orientation.create_dataset(name="rot90", data=3) -# # data = eiger9m.create_ext_link( -# # "data", self.file_references["eiger9m"]["path"], "EG9M/data" -# # ) -# # status = eiger9m.create_ext_link( -# # "status", self.file_references["eiger9m"]["path"], "EG9M/status" -# # ) - -# # if ( -# # "pilatus_2" in self.device_manager.devices -# # and self.device_manager.devices.pilatus_2.enabled -# # and "pilatus_2" in self.file_references -# # ): -# # pilatus_2 = instrument.create_group("pilatus_2") -# # pilatus_2.attrs["NX_class"] = "NXdetector" -# # x_pixel_size = pilatus_2.create_dataset(name="x_pixel_size", data=172) -# # x_pixel_size.attrs["units"] = "um" -# # y_pixel_size = pilatus_2.create_dataset(name="y_pixel_size", data=172) -# # y_pixel_size.attrs["units"] = "um" -# # polar_angle = pilatus_2.create_dataset(name="polar_angle", data=0) -# # polar_angle.attrs["units"] = "degrees" -# # azimuthal_angle = pilatus_2.create_dataset(name="azimuthal_angle", data=0) -# # azimuthal_angle.attrs["units"] = "degrees" -# # rotation_angle = pilatus_2.create_dataset(name="rotation_angle", data=0) -# # rotation_angle.attrs["units"] = "degrees" -# # description = pilatus_2.create_dataset( -# # name="description", data="Pilatus 300K detector, Dectris, Switzerland" -# # ) -# # orientation = pilatus_2.create_group("orientation") -# # orientation.attrs["description"] = ( -# # "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam." -# # ) -# # orientation.create_dataset(name="transpose", data=1) -# # orientation.create_dataset(name="rot90", data=2) -# # data = pilatus_2.create_ext_link( -# # "data", self.file_references["pilatus_2"]["path"], "entry/instrument/pilatus_2/data" -# # ) - -# # if ( -# # "falcon" in self.device_manager.devices -# # and self.device_manager.devices.falcon.enabled -# # and "falcon" in self.file_references -# # ): -# # falcon = instrument.create_ext_link( -# # "falcon", self.file_references["falcon"]["path"], "entry/instrument/FalconX1" -# # ) + # Real-time encoder positions (RtFlomniFlyer) + # Single soft link to the entire rt_positions folder in the BEC + # collection. This is the primary scan coordinate for ptychography. + self._safe_soft_link( + flomni, "rt_positions", + "/entry/collection/devices/rt_positions", + ) -- 2.52.0