Compare commits

..

49 Commits

Author SHA1 Message Date
x12sa
ac23be094e tomo id on private server added
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m56s
2026-04-01 13:15:10 +02:00
x12sa
10cc820b2c url for tomo id
All checks were successful
CI for csaxs_bec / test (push) Successful in 2m0s
2026-04-01 11:47:53 +02:00
x12sa
acc3fb0104 webpage quet http server, signal 1to1 device, first galil configs
All checks were successful
CI for csaxs_bec / test (push) Successful in 2m39s
2026-03-31 17:50:03 +02:00
6a704c6dd0 test: add tests for bpm and bpm_control
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 2m0s
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 1s
CI for csaxs_bec / test (push) Successful in 1m58s
2026-03-31 14:25:16 +02:00
2e014bd9ea fix(pseudo_devices): fix pseudo devices, bpm and bpm_control 2026-03-31 14:25:16 +02:00
x12sa
006a451220 feat: Add BPM and BPMControl pseudo devices 2026-03-31 14:25:07 +02:00
x12sa
bdc996d3b2 some adjustments in structure
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m57s
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 1s
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-31 12:56:15 +02:00
x12sa
2fac8bc1d7 minor config updates
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m56s
2026-03-31 11:12:55 +02:00
x12sa
bf045dadf1 first version from claude
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-30 17:13:45 +02:00
x12sa
be508cf300 first version with working entries 2026-03-30 16:48:47 +02:00
x12sa
f786e34a0e fix thread stop
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m56s
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 1s
CI for csaxs_bec / test (push) Successful in 1m58s
2026-03-30 14:41:44 +02:00
x12sa
cceedc947a local server added
All checks were successful
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 1s
CI for csaxs_bec / test (push) Successful in 2m0s
2026-03-30 13:07:46 +02:00
x12sa
80de9724d4 removed linear progress bar 2026-03-30 13:07:46 +02:00
x12sa
2ac02e0623 http error message cooldown to 10 mins 2026-03-30 13:07:46 +02:00
x12sa
3c2a0aa484 error message cooldown 2026-03-30 13:07:46 +02:00
x12sa
27f4eca4ae change url and fix in https certificate ignore 2026-03-30 13:07:46 +02:00
x12sa
f2771bd4b6 https without certificate possible 2026-03-30 13:07:46 +02:00
x12sa
546ebf8a58 mod audio gen 2026-03-30 13:07:46 +02:00
d3f1d31bb8 fix(file_writer): Fix file_writer format method.
All checks were successful
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 2s
CI for csaxs_bec / test (push) Successful in 2m25s
2026-03-30 12:34:07 +02:00
6d404cad12 fix(omny/shutter): MonitorSignal wrapper with auto_monitor
All checks were successful
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 1s
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-27 22:41:57 +01:00
x12sa
b67e1c012c renamed rt flyer to rt positions
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m54s
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 1s
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-27 16:05:35 +01:00
x12sa
cbbec12d9b option to upload to a php interface
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-27 15:44:49 +01:00
x12sa
8f4a9f025e some adjustments
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m56s
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 2s
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-27 13:43:28 +01:00
x12sa
1b9b983ab2 wip optics config for energy device
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m56s
2026-03-27 12:48:51 +01:00
x12sa
d7b442969a added motors to endstation config 2026-03-27 12:48:51 +01:00
x12sa
f92db3f169 movable cards and more
All checks were successful
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 3s
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-26 16:38:51 +01:00
x12sa
55531c8a65 next version 2026-03-26 16:38:51 +01:00
x12sa
1d408818cc next iteration, seems a first good and usable ver. 2026-03-26 16:38:51 +01:00
x12sa
ae2045dd10 fixes in contrast and audio confirm logif 2026-03-26 16:38:51 +01:00
x12sa
fd4d455a5b webpage version2 2026-03-26 16:38:51 +01:00
x12sa
3411aaaeb4 first version of webpage 2026-03-26 16:38:51 +01:00
x12sa
d9fc3094b6 fixed width for logo in scilog
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m59s
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 2s
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-25 13:39:07 +01:00
x12sa
88df4781ec tags added
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m54s
2026-03-24 15:36:25 +01:00
x12sa
3b474c89c8 removed write subtomo to scilog 2026-03-24 15:34:36 +01:00
x12sa
68cc13e1d3 alignment scans to scilog 2026-03-24 15:33:14 +01:00
x12sa
700f3f9bb9 scilog tag added 2026-03-24 15:27:25 +01:00
x12sa
15a4d45f68 moved tomo_reconstruct to tomo_scan_projection
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m59s
2026-03-24 15:00:17 +01:00
x12sa
7c7f877d78 new logos for logbook 2026-03-24 14:59:50 +01:00
x12sa
5d61d756c9 logo and scilog newline fixed
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m56s
2026-03-23 17:00:28 +01:00
x12sa
b37ae3ef57 wip message to scilog when tomo starts
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-23 16:29:30 +01:00
x12sa
76ed858e5c added heartbeat, start and remaining time to progress
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-23 15:58:54 +01:00
x12sa
a0555def4d changed progress dict to global variable
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m56s
2026-03-23 15:48:17 +01:00
x12sa
c1ad2fc4c3 pdf status report fixes
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-23 12:38:37 +01:00
x12sa
9eee4ee1f7 minor fixes during testing
All checks were successful
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 2s
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-19 11:17:41 +01:00
c97b00cc8c fix: flomni async readout
All checks were successful
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 2s
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-19 11:13:26 +01:00
d6a4fd37fc fix(mcs): fix _progress_udpate
Some checks failed
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 2s
CI for csaxs_bec / test (push) Has been cancelled
2026-03-19 11:11:54 +01:00
6d4c9d90fc fix(mcs): omit_mca_callbacks if stop is called. 2026-03-19 11:11:54 +01:00
87163cc3f1 docs: run build on py 3-12
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m57s
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 4s
CI for csaxs_bec / test (push) Successful in 1m56s
2026-03-18 14:35:42 +01:00
7c17a3ae40 ci: add rtd workflow
Some checks failed
CI for csaxs_bec / test (push) Has been cancelled
2026-03-18 14:34:52 +01:00
33 changed files with 4510 additions and 750 deletions

View File

@@ -0,0 +1,21 @@
name: Read the Docs Deploy Trigger
on:
push:
branches:
- main
workflow_dispatch:
jobs:
trigger-rtd-webhook:
runs-on: ubuntu-latest
steps:
- name: Trigger Read the Docs webhook
env:
RTD_TOKEN: ${{ secrets.RTD_TOKEN }}
run: |
curl --fail --show-error --silent \
-X POST \
-d "branches=${{ github.ref_name }}" \
-d "token=${RTD_TOKEN}" \
"https://readthedocs.org/api/v2/webhook/sls-csaxs/270162/"

View File

@@ -8,15 +8,14 @@ version: 2
build:
os: ubuntu-20.04
tools:
python: "3.10"
python: "3.12"
jobs:
pre_install:
- pip install .
# Build documentation in the docs/ directory with Sphinx
sphinx:
configuration: docs/conf.py
configuration: docs/conf.py
# If using Sphinx, optionally build your docs in additional formats such as PDF
# formats:
@@ -24,6 +23,5 @@ sphinx:
# Optionally declare the Python requirements required to build your docs
python:
install:
- requirements: docs/requirements.txt
install:
- requirements: docs/requirements.txt

Binary file not shown.

After

Width:  |  Height:  |  Size: 562 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 48 KiB

View File

@@ -0,0 +1,89 @@
"""
LamNI/webpage_generator.py
===========================
LamNI-specific webpage generator subclass.
Integration (inside the LamNI __init__ / startup):
---------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.LamNI.webpage_generator import (
LamniWebpageGenerator,
)
self._webpage_gen = LamniWebpageGenerator(
bec_client=client,
output_dir="~/data/raw/webpage/",
)
self._webpage_gen.start()
Or use the factory (auto-selects by session name "lamni"):
----------------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
make_webpage_generator,
)
self._webpage_gen = make_webpage_generator(bec, output_dir="~/data/raw/webpage/")
self._webpage_gen.start()
Interactive helpers:
--------------------
lamni._webpage_gen.status()
lamni._webpage_gen.verbosity = 2
lamni._webpage_gen.stop()
lamni._webpage_gen.start()
"""
from pathlib import Path
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
WebpageGeneratorBase,
_safe_get,
_safe_float,
_gvar,
)
class LamniWebpageGenerator(WebpageGeneratorBase):
"""
LamNI-specific webpage generator.
Logo: LamNI.png from the same directory as this file.
Override _collect_setup_data() to add LamNI-specific temperatures,
sample name, and measurement settings.
"""
# TODO: fill in LamNI-specific device paths
# label -> dotpath under device_manager.devices
_TEMP_MAP = {
# "Sample": "lamni_temphum.temperature_sample",
# "OSA": "lamni_temphum.temperature_osa",
}
def _logo_path(self):
return Path(__file__).parent / "LamNI.png"
def _collect_setup_data(self) -> dict:
# ── LamNI-specific data goes here ─────────────────────────────
# Uncomment and adapt when device names are known:
#
# dm = self._bec.device_manager
# sample_name = _safe_get(dm, "lamni_samples.sample_names.sample0") or "N/A"
# temperatures = {
# label: _safe_float(_safe_get(dm, path))
# for label, path in self._TEMP_MAP.items()
# }
# settings = {
# "Sample name": sample_name,
# "FOV x / y": ...,
# "Exposure time": _gvar(self._bec, "tomo_countingtime", ".3f", " s"),
# "Angle step": _gvar(self._bec, "tomo_angle_stepsize", ".2f", "\u00b0"),
# }
# return {
# "type": "lamni",
# "sample_name": sample_name,
# "temperatures": temperatures,
# "settings": settings,
# }
# Placeholder — returns minimal info until implemented
return {
"type": "lamni",
# LamNI-specific data here
}

View File

@@ -70,7 +70,7 @@ DLPCA200_AMPLIFIER_CONFIG: dict[str, dict] = {
"rio_device": "galilrioesxbox",
"description": "Beam Position Monitor 4 current amplifier",
"channels": {
"gain_lsb": 0, # Pin 10 -> Galil ch0
"gain_lsb": rio_optics.analog_in.ch0, # Pin 10 -> Galil ch0
"gain_mid": 1, # Pin 11 -> Galil ch1
"gain_msb": 2, # Pin 12 -> Galil ch2
"coupling": 3, # Pin 13 -> Galil ch3

Binary file not shown.

After

Width:  |  Height:  |  Size: 124 KiB

View File

@@ -21,6 +21,14 @@ from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import (
TomoIDManager,
)
# from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
# FlomniWebpageGenerator,
# VERBOSITY_SILENT, # 0 — no output
# VERBOSITY_NORMAL, # 1 — startup / stop messages only (default)
# VERBOSITY_VERBOSE, # 2 — one-line summary per cycle
# VERBOSITY_DEBUG, # 3 — full JSON payload per cycle
# )
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
@@ -969,8 +977,6 @@ class FlomniSampleTransferMixin:
class FlomniAlignmentMixin:
import csaxs_bec
import os
from pathlib import Path
# Ensure this is a Path object, not a string
csaxs_bec_basepath = Path(csaxs_bec.__file__)
@@ -1211,6 +1217,76 @@ class FlomniAlignmentMixin:
return additional_correction_shift
class _ProgressProxy:
"""Dict-like proxy that persists the flOMNI progress dict as a BEC global variable.
Every read (`proxy["key"]`) fetches the current dict from the global var store,
and every write (`proxy["key"] = val`) fetches, updates, and saves it back.
This makes the progress state visible to all BEC client sessions via
``client.get_global_var("tomo_progress")``.
"""
_GLOBAL_VAR_KEY = "tomo_progress"
_DEFAULTS: dict = {
"subtomo": 0,
"subtomo_projection": 0,
"subtomo_total_projections": 1,
"projection": 0,
"total_projections": 1,
"angle": 0,
"tomo_type": 0,
"tomo_start_time": None,
"estimated_remaining_time": None,
"heartbeat": None,
}
def __init__(self, client):
self._client = client
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _load(self) -> dict:
val = self._client.get_global_var(self._GLOBAL_VAR_KEY)
if val is None:
return dict(self._DEFAULTS)
return val
def _save(self, data: dict) -> None:
self._client.set_global_var(self._GLOBAL_VAR_KEY, data)
# ------------------------------------------------------------------
# Dict-like interface
# ------------------------------------------------------------------
def __getitem__(self, key):
return self._load()[key]
def __setitem__(self, key, value) -> None:
data = self._load()
data[key] = value
self._save(data)
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._load()!r})"
def get(self, key, default=None):
return self._load().get(key, default)
def update(self, *args, **kwargs) -> None:
"""Update multiple fields in a single round-trip."""
data = self._load()
data.update(*args, **kwargs)
self._save(data)
def reset(self) -> None:
"""Reset all progress fields to their default values."""
self._save(dict(self._DEFAULTS))
def as_dict(self) -> dict:
"""Return a plain copy of the current progress state."""
return self._load()
class Flomni(
FlomniInitStagesMixin,
FlomniSampleTransferMixin,
@@ -1233,14 +1309,20 @@ class Flomni(
self.corr_angle_y = []
self.corr_pos_y_2 = []
self.corr_angle_y_2 = []
self.progress = {}
self.progress["subtomo"] = 0
self.progress["subtomo_projection"] = 0
self.progress["subtomo_total_projections"] = 1
self.progress["projection"] = 0
self.progress["total_projections"] = 1
self.progress["angle"] = 0
self.progress["tomo_type"] = 0
self._progress_proxy = _ProgressProxy(self.client)
self._progress_proxy.reset()
from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_webpage_generator import (
FlomniWebpageGenerator,
)
self._webpage_gen = FlomniWebpageGenerator(
bec_client=client,
output_dir="~/data/raw/webpage/",
#upload_url="http://s1090968537.online.de/upload.php", # optional
upload_url="https://v1p0zyg2w9n2k9c1.myfritz.net/upload.php",
local_port=8080
)
self._webpage_gen.start()
self.OMNYTools = OMNYTools(self.client)
self.reconstructor = PtychoReconstructor(self.ptycho_reconstruct_foldername)
self.tomo_id_manager = TomoIDManager()
@@ -1296,6 +1378,42 @@ class Flomni(
self.special_angles = []
self.special_angle_repeats = 1
@property
def progress(self) -> _ProgressProxy:
"""Proxy dict backed by the BEC global variable ``tomo_progress``.
Readable from any BEC client session via::
client.get_global_var("tomo_progress")
Individual fields can be read and written just like a regular dict::
flomni.progress["projection"] # read
flomni.progress["projection"] = 42 # write (persists immediately)
To update multiple fields atomically use :py:meth:`_ProgressProxy.update`::
flomni.progress.update(projection=42, angle=90.0)
To reset all fields to their defaults::
flomni.progress.reset()
"""
return self._progress_proxy
@progress.setter
def progress(self, val: dict) -> None:
"""Replace the entire progress dict.
Accepts a plain :class:`dict` and persists it to the global var store.
Example::
flomni.progress = {"projection": 0, "total_projections": 100, ...}
"""
if not isinstance(val, dict):
raise TypeError(f"progress must be a dict, got {type(val).__name__!r}")
self._progress_proxy._save(val)
@property
def tomo_shellstep(self):
val = self.client.get_global_var("tomo_shellstep")
@@ -1482,21 +1600,11 @@ class Flomni(
def sample_name(self):
return self.sample_get_name(0)
def write_to_scilog(self, content, tags: list = None):
try:
if tags is not None:
tags.append("BEC")
else:
tags = ["BEC"]
msg = bec.logbook.LogbookMessage()
msg.add_text(content).add_tag(tags)
self.client.logbook.send_logbook_message(msg)
except Exception:
logger.warning("Failed to write to scilog.")
def tomo_alignment_scan(self):
"""
Performs a tomogram alignment scan.
Collects all scan numbers acquired during the alignment, prints them at the end,
and creates a BEC scilog text entry summarising the alignment scan numbers.
"""
if self.get_alignment_offset(0) == (0, 0, 0):
print("It appears that the xrayeye alignemtn was not performend or loaded. Aborting.")
@@ -1506,11 +1614,9 @@ class Flomni(
self.feye_out()
tags = ["BEC_alignment_tomo", self.sample_name]
self.write_to_scilog(
f"Starting alignment scan. First scan number: {bec.queue.next_scan_number}.", tags
)
start_angle = 0
alignment_scan_numbers = []
angle_end = start_angle + 180
for angle in np.linspace(start_angle, angle_end, num=int(180 / 45) + 1, endpoint=True):
@@ -1522,7 +1628,6 @@ class Flomni(
try:
start_scan_number = bec.queue.next_scan_number
self.tomo_scan_projection(angle)
self.tomo_reconstruct()
error_caught = False
except AlarmBase as exc:
if exc.alarm_type == "TimeoutError":
@@ -1536,24 +1641,27 @@ class Flomni(
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, 0)
#self._write_tomo_scan_number(scan_nr, angle, 0)
alignment_scan_numbers.append(scan_nr)
umv(dev.fsamroy, 0)
self.OMNYTools.printgreenbold(
"\n\nAlignment scan finished. Please run SPEC_ptycho_align and load the new fit by flomni.read_alignment_offset() ."
)
def _write_subtomo_to_scilog(self, subtomo_number):
dev = builtins.__dict__.get("dev")
bec = builtins.__dict__.get("bec")
if self.tomo_id > 0:
tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"]
else:
tags = ["BEC_subtomo", self.sample_name]
self.write_to_scilog(
f"Starting subtomo: {subtomo_number}. First scan number: {bec.queue.next_scan_number}.",
tags,
# summary of alignment scan numbers
scan_list_str = ", ".join(str(s) for s in alignment_scan_numbers)
#print(f"\nAlignment scan numbers ({len(alignment_scan_numbers)} total): {scan_list_str}")
# BEC scilog entry (no logo)
scilog_content = (
f"Alignment scan finished.\n"
f"Sample: {self.sample_name}\n"
f"Number of alignment scans: {len(alignment_scan_numbers)}\n"
f"Alignment scan numbers: {scan_list_str}\n"
)
print(scliog_content)
bec.messaging.scilog.new().add_text(scilog_content.replace("\n", "<br>")).add_tags("alignmentscan").send()
def sub_tomo_scan(self, subtomo_number, start_angle=None):
"""
@@ -1562,18 +1670,6 @@ class Flomni(
subtomo_number (int): The sub tomogram number.
start_angle (float, optional): The start angle of the scan. Defaults to None.
"""
# dev = builtins.__dict__.get("dev")
# bec = builtins.__dict__.get("bec")
# if self.tomo_id > 0:
# tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"]
# else:
# tags = ["BEC_subtomo", self.sample_name]
# self.write_to_scilog(
# f"Starting subtomo: {subtomo_number}. First scan number: {bec.queue.next_scan_number}.",
# tags,
# )
self._write_subtomo_to_scilog(subtomo_number)
if start_angle is not None:
print(f"Sub tomo scan with start angle {start_angle} requested.")
@@ -1673,6 +1769,7 @@ class Flomni(
successful = False
error_caught = False
if 0 <= angle < 180.05:
self.progress["heartbeat"] = datetime.datetime.now().isoformat()
print(f"Starting flOMNI scan for angle {angle} in subtomo {subtomo_number}")
self._print_progress()
while not successful:
@@ -1732,20 +1829,22 @@ class Flomni(
or (self.tomo_type == 3 and projection_number == None)
):
# pylint: disable=undefined-variable
# if bec.active_account != "":
# self.tomo_id = self.add_sample_database(
# self.sample_name,
# str(datetime.date.today()),
# bec.active_account,
# bec.queue.next_scan_number,
# "flomni",
# "test additional info",
# "BEC",
# )
# self.write_pdf_report()
# else:
self.tomo_id = 0
#pylint: disable=undefined-variable
if bec.active_account != "":
self.tomo_id = self.add_sample_database(
self.sample_name,
str(datetime.date.today()),
bec.active_account,
bec.queue.next_scan_number,
"flomni",
"test additional info",
"BEC",
)
self.write_pdf_report()
else:
self.tomo_id = 0
self.write_pdf_report()
self.progress["tomo_start_time"] = datetime.datetime.now().isoformat()
with scans.dataset_id_on_hold:
if self.tomo_type == 1:
@@ -1765,7 +1864,6 @@ class Flomni(
while True:
angle, subtomo_number = self._golden(ii, self.golden_ratio_bunch_size, 180, 1)
if previous_subtomo_number != subtomo_number:
self._write_subtomo_to_scilog(subtomo_number)
if (
subtomo_number % 2 == 1
and ii > 10
@@ -1813,7 +1911,6 @@ class Flomni(
ii, int(180 / self.tomo_angle_stepsize), 180, 1, 0
)
if previous_subtomo_number != subtomo_number:
self._write_subtomo_to_scilog(subtomo_number)
if (
subtomo_number % 2 == 1
and ii > 10
@@ -1855,14 +1952,42 @@ class Flomni(
self._print_progress()
self.OMNYTools.printgreenbold("Tomoscan finished")
@staticmethod
def _format_duration(seconds: float) -> str:
"""Format a duration in seconds as a human-readable string, e.g. '2h 03m 15s'."""
seconds = int(seconds)
h, remainder = divmod(seconds, 3600)
m, s = divmod(remainder, 60)
if h > 0:
return f"{h}h {m:02d}m {s:02d}s"
if m > 0:
return f"{m}m {s:02d}s"
return f"{s}s"
def _print_progress(self):
# --- compute and store estimated remaining time -----------------------
start_str = self.progress.get("tomo_start_time")
projection = self.progress["projection"]
total = self.progress["total_projections"]
if start_str is not None and total > 0 and projection > 9:
elapsed = (
datetime.datetime.now() - datetime.datetime.fromisoformat(start_str)
).total_seconds()
rate = projection / elapsed # projections per second
remaining_s = (total - projection) / rate
self.progress["estimated_remaining_time"] = remaining_s
eta_str = self._format_duration(remaining_s)
else:
eta_str = "N/A"
# ----------------------------------------------------------------------
print("\x1b[95mProgress report:")
print(f"Tomo type: ....................... {self.progress['tomo_type']}")
print(f"Projection: ...................... {self.progress['projection']:.0f}")
print(f"Total projections expected ....... {self.progress['total_projections']}")
print(f"Angle: ........................... {self.progress['angle']}")
print(f"Current subtomo: ................. {self.progress['subtomo']}")
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}\x1b[0m")
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}")
print(f"Estimated remaining time: ........ {eta_str}\x1b[0m")
self._flomnigui_update_progress()
def add_sample_database(
@@ -1886,7 +2011,6 @@ class Flomni(
return
self.tomo_scan_projection(angle)
self.tomo_reconstruct()
def _golden(self, ii, howmany_sorted, maxangle, reverse=False):
"""returns the iis golden ratio angle of sorted bunches of howmany_sorted and its subtomo number"""
@@ -1991,7 +2115,7 @@ class Flomni(
f"{str(datetime.datetime.now())}: flomni scan projection at angle {angle}, scan"
f" number {bec.queue.next_scan_number}.\n"
)
# self.write_to_scilog(log_message, ["BEC_scans", self.sample_name])
scans.flomni_fermat_scan(
fovx=self.fovx,
fovy=self.fovy,
@@ -2004,6 +2128,9 @@ class Flomni(
corridor_size=corridor_size,
)
self.tomo_reconstruct()
def tomo_parameters(self):
"""print and update the tomo parameters"""
print("Current settings:")
@@ -2142,19 +2269,21 @@ class Flomni(
+ ' 888 888 "Y88888P" 888 888 888 Y888 8888888 \n'
)
padding = 20
fovxy = f"{self.fovx:.2f}/{self.fovy:.2f}"
stitching = f"{self.stitch_x:.2f}/{self.stitch_y:.2f}"
fovxy = f"{self.fovx:.1f}/{self.fovy:.1f}"
stitching = f"{self.stitch_x:.0f}/{self.stitch_y:.0f}"
dataset_id = str(self.client.queue.next_dataset_number)
account = bec.active_account
content = [
f"{'Sample Name:':<{padding}}{self.sample_name:>{padding}}\n",
f"{'Measurement ID:':<{padding}}{str(self.tomo_id):>{padding}}\n",
f"{'Dataset ID:':<{padding}}{dataset_id:>{padding}}\n",
f"{'Sample Info:':<{padding}}{'Sample Info':>{padding}}\n",
f"{'e-account:':<{padding}}{str(self.client.username):>{padding}}\n",
f"{'e-account:':<{padding}}{str(account):>{padding}}\n",
f"{'Number of projections:':<{padding}}{int(180 / self.tomo_angle_stepsize * 8):>{padding}}\n",
f"{'First scan number:':<{padding}}{self.client.queue.next_scan_number:>{padding}}\n",
f"{'Last scan number approx.:':<{padding}}{self.client.queue.next_scan_number + int(180 / self.tomo_angle_stepsize * 8) + 10:>{padding}}\n",
f"{'Current photon energy:':<{padding}}{dev.mokev.read()['mokev']['value']:>{padding}.4f}\n",
f"{'Current photon energy:':<{padding}}To be implemented\n",
#f"{'Current photon energy:':<{padding}}{dev.mokev.read()['mokev']['value']:>{padding}.4f}\n",
f"{'Exposure time:':<{padding}}{self.tomo_countingtime:>{padding}.2f}\n",
f"{'Fermat spiral step size:':<{padding}}{self.tomo_shellstep:>{padding}.2f}\n",
f"{'FOV:':<{padding}}{fovxy:>{padding}}\n",
@@ -2163,20 +2292,38 @@ class Flomni(
f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n",
]
content = "".join(content)
user_target = os.path.expanduser(f"~/Data10/documentation/tomo_scan_ID_{self.tomo_id}.pdf")
user_target = os.path.expanduser(f"~/data/raw/documentation/tomo_scan_ID_{self.tomo_id}.pdf")
with PDFWriter(user_target) as file:
file.write(header)
file.write(content)
subprocess.run(
"xterm /work/sls/spec/local/XOMNY/bin/upload/upload_last_pon.sh &", shell=True
)
# subprocess.run(
# "xterm /work/sls/spec/local/XOMNY/bin/upload/upload_last_pon.sh &", shell=True
# )
# status = subprocess.run(f"cp /tmp/spec-e20131-specES1.pdf {user_target}", shell=True)
msg = bec.logbook.LogbookMessage()
logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LamNI_logo.png")
msg.add_file(logo_path).add_text("".join(content).replace("\n", "</p><p>")).add_tag(
["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "LamNI", self.sample_name]
)
self.client.logbook.send_logbook_message(msg)
# msg = bec.tomo_progress.tomo_progressMessage()
# logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LamNI_logo.png")
# msg.add_file(logo_path).add_text("".join(content).replace("\n", "</p><p>")).add_tag(
# ["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "flOMNI", self.sample_name]
# )
# self.client.tomo_progress.send_tomo_progress_message("~/data/raw/documentation/tomo_scan_ID_{self.tomo_id}.pdf").send()
import csaxs_bec
# Ensure this is a Path object, not a string
csaxs_bec_basepath = Path(csaxs_bec.__file__)
logo_file_rel = "flOMNI.png"
# Build the absolute path correctly
logo_file = (
csaxs_bec_basepath.parent
/ "bec_ipython_client"
/ "plugins"
/ "flomni"
/ logo_file_rel
).resolve()
print(logo_file)
bec.messaging.scilog.new().add_attachment(logo_file, width=200).add_text(content.replace("\n", "<br>")).add_tags("tomoscan").send()
if __name__ == "__main__":

File diff suppressed because it is too large Load Diff

View File

@@ -223,6 +223,14 @@ class flomniGuiTools:
self._flomnigui_update_progress()
def _flomnigui_update_progress(self):
"""Update the progress ring bar and center label from the current progress state.
``self.progress`` is backed by the BEC global variable ``tomo_progress``
(see :class:`_ProgressProxy` in ``flomni.py``), so this method reflects
the live state that is also accessible from other BEC client sessions via::
client.get_global_var("tomo_progress")
"""
main_progress_ring = self.progressbar.rings[0]
subtomo_progress_ring = self.progressbar.rings[1]
if self.progressbar is not None:
@@ -235,6 +243,31 @@ class flomniGuiTools:
main_progress_ring.set_value(progress)
subtomo_progress_ring.set_value(subtomo_progress)
# --- format start time for display --------------------------------
start_str = self.progress.get("tomo_start_time")
if start_str is not None:
import datetime as _dt
start_display = _dt.datetime.fromisoformat(start_str).strftime("%Y-%m-%d %H:%M:%S")
else:
start_display = "N/A"
# --- format estimated remaining time ------------------------------
remaining_s = self.progress.get("estimated_remaining_time")
if remaining_s is not None and remaining_s >= 0:
import datetime as _dt
remaining_s = int(remaining_s)
h, rem = divmod(remaining_s, 3600)
m, s = divmod(rem, 60)
if h > 0:
eta_display = f"{h}h {m:02d}m {s:02d}s"
elif m > 0:
eta_display = f"{m}m {s:02d}s"
else:
eta_display = f"{s}s"
else:
eta_display = "N/A"
# ------------------------------------------------------------------
text = (
f"Progress report:\n"
f" Tomo type: {self.progress['tomo_type']}\n"
@@ -243,7 +276,9 @@ class flomniGuiTools:
f" Angle: {self.progress['angle']:.1f}\n"
f" Current subtomo: {self.progress['subtomo']}\n"
f" Current projection within subtomo: {self.progress['subtomo_projection']}\n"
f" Total projections per subtomo: {int(self.progress['subtomo_total_projections'])}"
f" Total projections per subtomo: {int(self.progress['subtomo_total_projections'])}\n"
f" Scan started: {start_display}\n"
f" Est. remaining: {eta_display}"
)
self.progressbar.set_center_label(text)

Binary file not shown.

After

Width:  |  Height:  |  Size: 359 KiB

View File

@@ -234,9 +234,10 @@ class TomoIDManager:
)
"""
OMNY_URL = "https://omny.web.psi.ch/samples/newmeasurement.php"
OMNY_USER = "omny"
OMNY_PASSWORD = "samples"
#OMNY_URL = "https://omny.web.psi.ch/samples/newmeasurement.php"
OMNY_URL = "https://v1p0zyg2w9n2k9c1.myfritz.net/samples/newmeasurement.php"
OMNY_USER = ""
OMNY_PASSWORD = ""
TMP_FILE = "/tmp/currsamplesnr.txt"
def register(
@@ -273,9 +274,14 @@ class TomoIDManager:
f"&additional={additional_info}"
f"&user={user}"
)
# subprocess.run(
# f"wget --user={self.OMNY_USER} --password={self.OMNY_PASSWORD}"
# f" -q -O {self.TMP_FILE} '{url}'",
# shell=True,
# )
#print(url)
subprocess.run(
f"wget --user={self.OMNY_USER} --password={self.OMNY_PASSWORD}"
f" -q -O {self.TMP_FILE} '{url}'",
f"wget -q -O {self.TMP_FILE} '{url}'",
shell=True,
)
with open(self.TMP_FILE) as f:

View File

@@ -0,0 +1,96 @@
"""
omny/webpage_generator.py
==========================
OMNY-specific webpage generator subclass.
Integration (inside the OMNY __init__ / startup):
--------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.omny.webpage_generator import (
OmnyWebpageGenerator,
)
self._webpage_gen = OmnyWebpageGenerator(
bec_client=client,
output_dir="~/data/raw/webpage/",
)
self._webpage_gen.start()
Or use the factory (auto-selects by session name "omny"):
---------------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
make_webpage_generator,
)
self._webpage_gen = make_webpage_generator(bec, output_dir="~/data/raw/webpage/")
self._webpage_gen.start()
Interactive helpers:
--------------------
omny._webpage_gen.status()
omny._webpage_gen.verbosity = 2
omny._webpage_gen.stop()
omny._webpage_gen.start()
"""
from pathlib import Path
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
WebpageGeneratorBase,
_safe_get,
_safe_float,
_gvar,
)
class OmnyWebpageGenerator(WebpageGeneratorBase):
"""
OMNY-specific webpage generator.
Logo: OMNY.png from the same directory as this file.
Override _collect_setup_data() to add OMNY-specific temperatures,
sample name, and measurement settings.
The old OMNY spec webpage showed:
- Cryo temperatures (XOMNY-TEMP-CRYO-A/B)
- Per-channel temperatures (XOMNY-TEMP1..48)
- Dewar pressure / LN2 flow
- Interferometer strengths (OINTERF)
Map these to BEC device paths below once available.
"""
# TODO: fill in OMNY-specific device paths
# label -> dotpath under device_manager.devices
_TEMP_MAP = {
# "Sample (cryo A)": "omny_temp.cryo_a",
# "Cryo head (B)": "omny_temp.cryo_b",
}
def _logo_path(self):
return Path(__file__).parent / "OMNY.png"
def _collect_setup_data(self) -> dict:
# ── OMNY-specific data goes here ──────────────────────────────
# Uncomment and adapt when device names are known:
#
# dm = self._bec.device_manager
# sample_name = _safe_get(dm, "omny_samples.sample_names.sample0") or "N/A"
# temperatures = {
# label: _safe_float(_safe_get(dm, path))
# for label, path in self._TEMP_MAP.items()
# }
# settings = {
# "Sample name": sample_name,
# "FOV x / y": ...,
# "Exposure time": _gvar(self._bec, "tomo_countingtime", ".3f", " s"),
# "Angle step": _gvar(self._bec, "tomo_angle_stepsize", ".2f", "\u00b0"),
# }
# return {
# "type": "omny",
# "sample_name": sample_name,
# "temperatures": temperatures,
# "settings": settings,
# }
# Placeholder — returns minimal info until implemented
return {
"type": "omny",
# OMNY-specific data here
}

View File

@@ -72,7 +72,7 @@ xbpm3x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -95,7 +95,7 @@ xbpm3y:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -118,7 +118,7 @@ sl3trxi:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -141,7 +141,7 @@ sl3trxo:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -164,7 +164,7 @@ sl3trxb:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -187,7 +187,7 @@ sl3trxt:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -210,7 +210,7 @@ fast_shutter_n1_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -234,7 +234,7 @@ fast_shutter_o1_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -257,7 +257,7 @@ fast_shutter_o2_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -280,7 +280,7 @@ filter_array_1_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -303,7 +303,7 @@ filter_array_2_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -326,7 +326,7 @@ filter_array_3_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -349,7 +349,7 @@ filter_array_4_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -372,7 +372,7 @@ sl4trxi:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -395,7 +395,7 @@ sl4trxo:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -418,7 +418,7 @@ sl4trxb:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -441,7 +441,7 @@ sl4trxt:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -466,7 +466,7 @@ sl5trxi:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -489,7 +489,7 @@ sl5trxo:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -512,7 +512,7 @@ sl5trxb:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -535,7 +535,7 @@ sl5trxt:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -544,6 +544,66 @@ sl5trxt:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 5
sl5ch:
description: ESbox1 slit 5 center horizontal
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
deviceConfig:
left_slit: sl5trxi
right_slit: sl5trxo
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxi
- sl5trxo
sl5wh:
description: ESbox1 slit 5 width horizontal
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
deviceConfig:
left_slit: sl5trxi
right_slit: sl5trxo
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxi
- sl5trxo
sl5cv:
description: ESbox1 slit 5 center vertical
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
deviceConfig:
left_slit: sl5trxb
right_slit: sl5trxt
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxb
- sl5trxt
sl5wv:
description: ESbox1 slit 5 width vertical
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
deviceConfig:
left_slit: sl5trxb
right_slit: sl5trxt
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxb
- sl5trxt
xbimtrx:
description: ESbox2 beam intensity monitor x movement
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
@@ -558,7 +618,7 @@ xbimtrx:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -581,7 +641,7 @@ xbimtry:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -589,3 +649,295 @@ xbimtry:
init_position: 0
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 1
################### XBOX related ###################
# we assue the epics settings for resolution, velocity etc. are correct
# we do not overwrite from here
aptrx:
description: Aperture pinhole X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-PIN1:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
aptry:
description: Aperture pinhole Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-PIN1:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
ebtrx:
description: Exposure box aperture X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-EB:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
ebtry:
description: Exposure box aperture Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-EB:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
ebtrz:
description: Exposure box aperture Z
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-EB:TRZ1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
ebsupport:
description: Exposure box granite support Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-EH1-EB:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttrx1:
description: FTS1 translation X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS1:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttry1:
description: FTS1 translation Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS1:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttrx2:
description: FTS2 translation X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS2:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttry2:
description: FTS2 translation Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS2:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttrz:
description: FTS1 translation Z
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS1:TRZ1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
bs1x:
description: Beamstop 1 X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-BS1:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
bs1y:
description: Beamstop 1 Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-BS1:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
bs2x:
description: Beamstop 2 X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-BS2:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
bs2y:
description: Beamstop 2 Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-BS2:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
dttrx:
description: Detector table X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-DETT:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
dttry:
description: Detector table Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-DETT:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
dttrz:
description: Detector table Z
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-DETT:TRZ1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
dettrx:
description: Detector 1 X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-DET1:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
####################
### Beamstop diode control for flight tube
####################
beamstop_gain_control:
description: Gain control for beamstop flightube
deviceClass: csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl
deviceConfig:
gain_lsb: galilrioesft.digital_out.ch0 # Pin 10 -> Galil ch0
gain_mid: galilrioesft.digital_out.ch1 # Pin 11 -> Galil ch1
gain_msb: galilrioesft.digital_out.ch2 # Pin 12 -> Galil ch2
coupling: galilrioesft.digital_out.ch3 # Pin 13 -> Galil ch3
speed_mode: galilrioesft.digital_out.ch4 # Pin 14 -> Galil ch4
enabled: true
readoutPriority: baseline
onFailure: retry
needs:
- galilrioesft
galilrioesft:
description: Galil RIO for remote gain switching and slow reading FlightTube
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
deviceConfig:
host: galilrioesft.psi.ch
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
beamstop_dummy_bpm:
description: BPM Xbox 2 (First Xbox in ES hutch)
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
deviceConfig:
left_top: galilrioesft.analog_in.ch0
right_top: galilrioesft.analog_in.ch1
right_bot: galilrioesft.analog_in.ch2
left_bot: galilrioesft.analog_in.ch3
enabled: true
readoutPriority: baseline
onFailure: retry
needs:
- galilrioesft
beamstop_intensity:
description: Beamstop intensity from Galil analog input ch6
deviceClass: csaxs_bec.devices.pseudo_devices.signal_forwarder.SignalForwarder
deviceConfig:
signal: galilrioesft.analog_in.ch6
enabled: true
readoutPriority: baseline
onFailure: retry
needs:
- galilrioesft

View File

@@ -199,6 +199,25 @@ xbpm1c4:
readOnly: true
softwareTrigger: false
bpm1:
description: 'XBPM1 (frontend)'
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
deviceConfig:
left_top: xbpm1c1
right_top: xbpm1c2
right_bot: xbpm1c3
left_bot: xbpm1c4
onFailure: raise
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
needs:
- xbpm1c1
- xbpm1c2
- xbpm1c3
- xbpm1c4
############################################
######### End of xbpm sub devices ##########
############################################

View File

@@ -68,91 +68,110 @@ ccmx:
- cSAXS
- optics
# TO BE REVIEWED, REMOVE VELOCITY WITH NEW CLASS!
ccm_energy:
description: 'test'
deviceClass: ophyd_devices.devices.simple_positioner.PSISimplePositioner
deviceConfig:
prefix: 'X12SA-OP-CCM1:'
override_suffixes:
user_readback: "ENERGY-GET"
user_setpoint: "ENERGY-SET"
velocity: "ROTY.VELO"
motor_done_move: "ROTY.DMOV"
onFailure: buffer
enabled: true
readoutPriority: baseline
readOnly: false
softwareTrigger: false
##########################################################################
######################## SMARACT STAGES ##################################
##########################################################################
xbpm2x:
description: X-ray beam position monitor 1 in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: A
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 0
# xbpm2x:
# description: X-ray beam position monitor 1 in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: A
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 0
xbpm2y:
description: X-ray beam position monitor 1 in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: B
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 1
# xbpm2y:
# description: X-ray beam position monitor 1 in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: B
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 1
# cu_foilx:
# description: Cu foil in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: C
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 2
# scinx:
# description: scintillator in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: D
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 3
cu_foilx:
description: Cu foil in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: C
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 2
scinx:
description: scintillator in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: D
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 3
# dmm1_trx_readback_example: # This is the same template as for i.e. bpm4i

View File

@@ -1,8 +1,8 @@
# This is the main configuration file that is
# commented or uncommented according to the type of experiment
# optics:
# - !include ./bl_optics_hutch.yaml
optics:
- !include ./bl_optics_hutch.yaml
# frontend:
# - !include ./bl_frontend.yaml

View File

@@ -395,6 +395,16 @@ rtz:
readoutPriority: on_request
connectionTimeout: 20
rt_positions:
deviceClass: csaxs_bec.devices.omny.rt.rt_flomni_ophyd.RtFlomniFlyer
deviceConfig:
host: mpc2844.psi.ch
port: 2222
readoutPriority: async
connectionTimeout: 20
enabled: true
readOnly: False
############################################################
####################### Cameras ############################
############################################################
@@ -512,6 +522,19 @@ omny_panda:
FMC_IN.VAL2.Min: cap_voltage_fzp_x_min
FMC_IN.VAL2.Max: cap_voltage_fzp_x_max
FMC_IN.VAL2.Mean: cap_voltage_fzp_x_mean
INENC1.VAL.Max: interf_st_fzp_y_max
INENC1.VAL.Mean: interf_st_fzp_y_mean
INENC1.VAL.Min: interf_st_fzp_y_min
INENC2.VAL.Max: interf_st_fzp_x_max
INENC2.VAL.Mean: interf_st_fzp_x_mean
INENC2.VAL.Min: interf_st_fzp_x_min
INENC3.VAL.Max: interf_st_rotz_max
INENC3.VAL.Mean: interf_st_rotz_mean
INENC3.VAL.Min: interf_st_rotz_min
INENC4.VAL.Max: interf_st_rotx_max
INENC4.VAL.Mean: interf_st_rotx_mean
INENC4.VAL.Min: interf_st_rotx_min
PCAP.GATE_DURATION.Value: pcap_gate_duration_value
deviceTags:
- detector
enabled: true

View File

@@ -0,0 +1,24 @@
galilrioesxbox:
description: Galil RIO for remote gain switching and slow reading ES XBox
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
deviceConfig:
host: galilrioesft.psi.ch
enabled: true
onFailure: raise
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
bpm1:
readoutPriority: baseline
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
deviceConfig:
blade_t: galilrioesxbox.analog_in.ch0
blade_r: galilrioesxbox.analog_in.ch1
blade_b: galilrioesxbox.analog_in.ch2
blade_l: galilrioesxbox.analog_in.ch3
enabled: true
readOnly: false
softwareTrigger: true
needs:
- galilrioesxbox

View File

@@ -317,8 +317,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
try:
scan_done = bool(value == self._num_total_triggers)
self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done)
if scan_done:
self._scan_done_event.set()
except Exception:
content = traceback.format_exc()
logger.info(f"Device {self.name} error: {content}")
@@ -393,6 +391,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self._current_data_index = 0
# NOTE Make sure that the signal that omits mca callbacks is cleared
# DO NOT REMOVE!!
self._omit_mca_callbacks.clear()
# For a fly scan we need to start the mcs card ourselves
@@ -563,8 +562,9 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
def on_stop(self) -> None:
"""Hook called when the device is stopped. In addition, any status that is registered through cancel_on_stop will be cancelled here."""
self.stop_all.put(1)
self.erase_all.put(1)
with suppress_mca_callbacks(self):
self.stop_all.put(1)
self.erase_all.put(1)
def mcs_recovery(self, timeout: int = 1) -> None:
"""

View File

@@ -1,20 +1,18 @@
import threading
import time
from typing import List
import numpy as np
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices import AsyncMultiSignal, DeviceStatus, ProgressSignal
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
from prettytable import PrettyTable
from csaxs_bec.devices.omny.rt.rt_ophyd import (
BECConfigError,
RtCommunicationError,
RtError,
RtReadbackSignal,
@@ -432,27 +430,6 @@ class RtFlomniController(Controller):
t.add_row([i, self.read_ssi_interferometer(i)])
print(t)
def _get_signals_from_table(self, return_table) -> dict:
self.average_stdeviations_x_st_fzp += float(return_table[4])
self.average_stdeviations_y_st_fzp += float(return_table[7])
signals = {
"target_x": {"value": float(return_table[2])},
"average_x_st_fzp": {"value": float(return_table[3])},
"stdev_x_st_fzp": {"value": float(return_table[4])},
"target_y": {"value": float(return_table[5])},
"average_y_st_fzp": {"value": float(return_table[6])},
"stdev_y_st_fzp": {"value": float(return_table[7])},
"average_rotz": {"value": float(return_table[8])},
"stdev_rotz": {"value": float(return_table[9])},
"average_stdeviations_x_st_fzp": {
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
},
"average_stdeviations_y_st_fzp": {
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
},
}
return signals
@threadlocked
def start_scan(self):
if not self.feedback_is_running():
@@ -492,91 +469,6 @@ class RtFlomniController(Controller):
current_position_in_scan = int(float(return_table[2]))
return (mode, number_of_positions_planned, current_position_in_scan)
def read_positions_from_sampler(self):
# this was for reading after the scan completed
number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read
read_counter = 0
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
# if not (mode==2 or mode==3):
# error
self.device_manager.connector.set(
MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage(
device="rt_scan", status=1, metadata=self.readout_metadata
),
)
# while scan is running
while mode > 0:
# TODO here?: scan abortion if no progress in scan *raise error
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
time.sleep(0.01)
if current_position_in_scan > 5:
while current_position_in_scan > read_counter + 1:
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
# logger.info(f"{return_table}")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
time.sleep(0.05)
# read the last samples even though scan is finished already
while number_of_positions_planned > read_counter:
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
# logger.info(f"{return_table}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
self.device_manager.connector.set(
MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage(
device="rt_scan", status=0, metadata=self.readout_metadata
),
)
logger.info(
"Flomni statistics: Average of all standard deviations: x"
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}"
)
def publish_device_data(self, signals, point_id):
self.device_manager.connector.set_and_publish(
MessageEndpoints.device_read("rt_flomni"),
messages.DeviceMessage(
signals=signals, metadata={"point_id": point_id, **self.readout_metadata}
),
)
def start_readout(self):
readout = threading.Thread(target=self.read_positions_from_sampler)
readout.start()
def kickoff(self, metadata):
self.readout_metadata = metadata
while not self._min_scan_buffer_reached:
time.sleep(0.001)
self.start_scan()
time.sleep(0.1)
self.start_readout()
class RtFlomniReadbackSignal(RtReadbackSignal):
@retry_once
@@ -844,6 +736,185 @@ class RtFlomniMotor(Device, PositionerBase):
return super().stop(success=success)
class RtFlomniFlyer(Device):
USER_ACCESS = ["controller"]
data = Cpt(
AsyncMultiSignal,
name="data",
signals=[
"target_x",
"average_x_st_fzp",
"stdev_x_st_fzp",
"target_y",
"average_y_st_fzp",
"stdev_y_st_fzp",
"average_rotz",
"stdev_rotz",
"average_stdeviations_x_st_fzp",
"average_stdeviations_y_st_fzp",
],
ndim=1,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
)
progress = Cpt(
ProgressSignal, doc="ProgressSignal indicating the progress of the device during a scan."
)
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
host="mpc2844.psi.ch",
port=2222,
socket_cls=SocketIO,
device_manager=None,
**kwargs,
):
super().__init__(prefix=prefix, name=name, parent=parent, **kwargs)
self.shutdown_event = threading.Event()
self.controller = RtFlomniController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
)
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
self.readout_thread = None
self.scan_done_event = threading.Event()
self.scan_done_event.set()
def read_positions_from_sampler(self, status: DeviceStatus):
"""
Read the positions from the sampler and update the data signal.
This function runs in a separate thread and continuously checks the
scan status.
Args:
status (DeviceStatus): The status object to update when the readout is complete.
"""
read_counter = 0
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
mode, number_of_positions_planned, current_position_in_scan = (
self.controller.get_scan_status()
)
# while scan is running
while mode > 0 and not self.shutdown_event.wait(0.01):
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
mode, number_of_positions_planned, current_position_in_scan = (
self.controller.get_scan_status()
)
if current_position_in_scan > 5:
while current_position_in_scan > read_counter + 1:
return_table = (
self.controller.socket_put_and_receive(f"r{read_counter}")
).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
self.progress.put(
value=read_counter, max_value=number_of_positions_planned, done=False
)
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.data.set(signals)
if self.shutdown_event.wait(0.05):
logger.info("Shutdown event set, stopping readout.")
# if we are here, the shutdown_event is set. We can exit the readout loop.
status.set_finished()
return
# read the last samples even though scan is finished already
while number_of_positions_planned > read_counter and not self.shutdown_event.is_set():
return_table = (self.controller.socket_put_and_receive(f"r{read_counter}")).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=False)
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.data.set(signals)
# NOTE: No need to set the status to failed if the shutdown_event is set.
# The stop() method will take care of that.
status.set_finished()
self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=True)
logger.info(
"Flomni statistics: Average of all standard deviations: x"
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}"
)
def _get_signals_from_table(self, return_table) -> dict:
self.average_stdeviations_x_st_fzp += float(return_table[4])
self.average_stdeviations_y_st_fzp += float(return_table[7])
signals = {
"target_x": {"value": float(return_table[2])},
"average_x_st_fzp": {"value": float(return_table[3])},
"stdev_x_st_fzp": {"value": float(return_table[4])},
"target_y": {"value": float(return_table[5])},
"average_y_st_fzp": {"value": float(return_table[6])},
"stdev_y_st_fzp": {"value": float(return_table[7])},
"average_rotz": {"value": float(return_table[8])},
"stdev_rotz": {"value": float(return_table[9])},
"average_stdeviations_x_st_fzp": {
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
},
"average_stdeviations_y_st_fzp": {
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
},
}
return signals
def stage(self):
self.shutdown_event.clear()
self.scan_done_event.set()
return super().stage()
def start_readout(self, status: DeviceStatus):
self.readout_thread = threading.Thread(
target=self.read_positions_from_sampler, args=(status,)
)
self.readout_thread.start()
def kickoff(self) -> DeviceStatus:
self.shutdown_event.clear()
self.scan_done_event.clear()
while not self.controller._min_scan_buffer_reached and not self.shutdown_event.wait(0.001):
...
self.controller.start_scan()
self.shutdown_event.wait(0.1)
status = DeviceStatus(self)
status.set_finished()
return status
def complete(self) -> DeviceStatus:
"""Wait until the flyer is done."""
if self.scan_done_event.is_set():
# if the scan_done_event is already set, we can return a finished status immediately
status = DeviceStatus(self)
status.set_finished()
return status
status = DeviceStatus(self)
self.start_readout(status)
status.add_callback(lambda *args, **kwargs: self.scan_done_event.set())
return status
def stop(self, *, success=False):
self.shutdown_event.set()
self.scan_done_event.set()
if self.readout_thread is not None:
self.readout_thread.join()
return super().stop(success=success)
if __name__ == "__main__":
rtcontroller = RtFlomniController(
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None

View File

@@ -13,6 +13,14 @@ from ophyd_devices import PSIDeviceBase
logger = bec_logger.logger
class MonitorSignal(Signal):
"""A simple wrapper around ophyd Signal that automatically monitors the signal for changes."""
def __init__(self, *, name, auto_monitor=False, **kwargs):
super().__init__(name=name, **kwargs)
self.auto_monitor = auto_monitor
class OMNYFastShutter(PSIDeviceBase, Device):
"""
Fast Shutter control for OMNY setup. If started with at the beamline, it will expose
@@ -26,7 +34,7 @@ class OMNYFastShutter(PSIDeviceBase, Device):
SUB_VALUE = "value"
_default_sub = SUB_VALUE
shutter = Cpt(Signal, name="shutter")
shutter = Cpt(MonitorSignal, name="shutter", auto_monitor=True)
# -----------------------------------------------------
# User-facing shutter control functions
@@ -48,7 +56,6 @@ class OMNYFastShutter(PSIDeviceBase, Device):
def fshopen(self):
"""Open the fast shutter."""
if self._check_if_cSAXS_shutter_exists_in_config():
self.shutter.put(1)
return self.device_manager.devices["fsh"].fshopen()
else:
self.shutter.put(1)
@@ -56,7 +63,6 @@ class OMNYFastShutter(PSIDeviceBase, Device):
def fshclose(self):
"""Close the fast shutter."""
if self._check_if_cSAXS_shutter_exists_in_config():
self.shutter.put(0)
return self.device_manager.devices["fsh"].fshclose()
else:
self.shutter.put(0)

View File

@@ -0,0 +1,172 @@
"""Module for a BPM pseudo device that computes the position and intensity from the blade signals."""
from ophyd import Component as Cpt
from ophyd import Kind, Signal
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
class BPM(PSIPseudoDeviceBase):
"""BPM positioner pseudo device."""
# Blade signals, a,b,c,d
left_top = Cpt(
BECProcessedSignal,
name="left_top",
model_config=None,
kind=Kind.config,
doc="BPM left_top blade",
)
right_top = Cpt(
BECProcessedSignal,
name="right_top",
model_config=None,
kind=Kind.config,
doc="BPM right_top blade",
)
right_bot = Cpt(
BECProcessedSignal,
name="right_bot",
model_config=None,
kind=Kind.config,
doc="BPM right_bottom blade",
)
left_bot = Cpt(
BECProcessedSignal,
name="left_bot",
model_config=None,
kind=Kind.config,
doc="BPM left_bot blade",
)
# Virtual signals
pos_x = Cpt(
BECProcessedSignal,
name="pos_x",
model_config=None,
kind=Kind.config,
doc="BPM X position, -1 fully left, 1 fully right",
)
pos_y = Cpt(
BECProcessedSignal,
name="pos_y",
model_config=None,
kind=Kind.config,
doc="BPM Y position, -1 fully bottom, 1 fully top",
)
diagonal = Cpt(
BECProcessedSignal,
name="diagonal",
model_config=None,
kind=Kind.config,
doc="BPM diagonal, -1 fully diagonal left_top-right_bot, 1 fully diagonal right_top-left_bot",
)
intensity = Cpt(
BECProcessedSignal,
name="intensity",
model_config=None,
kind=Kind.config,
doc="BPM intensity",
)
def __init__(
self,
name,
left_top: str,
right_top: str,
right_bot: str,
left_bot: str,
device_manager=None,
scan_info=None,
**kwargs,
):
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
# Get all blade signal objects from utility method
signal_t = self.left_top.get_device_object_from_bec(
object_name=left_top, signal_name=self.name, device_manager=device_manager
)
signal_r = self.right_top.get_device_object_from_bec(
object_name=right_top, signal_name=self.name, device_manager=device_manager
)
signal_b = self.right_bot.get_device_object_from_bec(
object_name=right_bot, signal_name=self.name, device_manager=device_manager
)
signal_l = self.left_bot.get_device_object_from_bec(
object_name=left_bot, signal_name=self.name, device_manager=device_manager
)
# Set compute methods for blade signals and virtual signals
self.left_top.set_compute_method(self._compute_blade_signal, signal=signal_t)
self.right_top.set_compute_method(self._compute_blade_signal, signal=signal_r)
self.right_bot.set_compute_method(self._compute_blade_signal, signal=signal_b)
self.left_bot.set_compute_method(self._compute_blade_signal, signal=signal_l)
self.intensity.set_compute_method(
self._compute_intensity,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
left_bot=self.left_bot,
)
self.pos_x.set_compute_method(
self._compute_pos_x,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
self.pos_y.set_compute_method(
self._compute_pos_y,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
self.diagonal.set_compute_method(
self._compute_diagonal,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
def _compute_blade_signal(self, signal: Signal) -> float:
return signal.get()
def _compute_intensity(
self, left_top: Signal, right_top: Signal, right_bot: Signal, left_bot: Signal
) -> float:
intensity = left_top.get() + right_top.get() + right_bot.get() + left_bot.get()
return intensity
def _compute_pos_x(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
"""X position from -1 to 1, where -1 means beam fully on the left side, 1 means beam fully on the right side."""
sum_left = left_bot.get() + left_top.get()
sum_right = right_top.get() + right_bot.get()
sum_total = sum_left + sum_right
if sum_total == 0:
return 0.0
return (sum_right - sum_left) / sum_total
def _compute_pos_y(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
"""Y position from -1 to 1, where -1 means beam fully on the bottom side, 1 means beam fully on the top side."""
sum_top = left_top.get() + right_top.get()
sum_bot = right_bot.get() + left_bot.get()
sum_total = sum_top + sum_bot
if sum_total == 0:
return 0.0
return (sum_top - sum_bot) / sum_total
def _compute_diagonal(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
sum_diag1 = left_bot.get() + right_top.get()
sum_diag2 = left_top.get() + right_bot.get()
sum_total = sum_diag1 + sum_diag2
if sum_total == 0:
return 0.0
return (sum_diag1 - sum_diag2) / sum_total

View File

@@ -0,0 +1,189 @@
"""
Module for controlling the BPM amplifier settings, such as gain and coupling.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Literal
from ophyd import Component as Cpt
from ophyd import Kind
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
from ophyd import Signal
_GAIN_BITS_LOW_NOISE: dict[tuple, int] = {
(0, 0, 0): int(1e3),
(0, 0, 1): int(1e4),
(0, 1, 0): int(1e5),
(0, 1, 1): int(1e6),
(1, 0, 0): int(1e7),
(1, 0, 1): int(1e8),
(1, 1, 0): int(1e9),
}
_GAIN_BITS_HIGH_SPEED: dict[tuple, int] = {
(0, 0, 0): int(1e5),
(0, 0, 1): int(1e6),
(0, 1, 0): int(1e7),
(0, 1, 1): int(1e8),
(1, 0, 0): int(1e9),
(1, 0, 1): int(1e10),
(1, 1, 0): int(1e11),
}
_GAIN_TO_BITS: dict[int, tuple] = {}
for _bits, _gain in _GAIN_BITS_LOW_NOISE.items():
_GAIN_TO_BITS[_gain] = (*_bits, True)
for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items():
if _gain not in _GAIN_TO_BITS: # low-noise takes priority
_GAIN_TO_BITS[_gain] = (*_bits, False)
VALID_GAINS = sorted(_GAIN_TO_BITS.keys())
class BPMControl(PSIPseudoDeviceBase):
"""
BPM amplifier control pseudo device. It is responsible for controlling the
gain and coupling for the BPM amplifier. It relies on signals from a device
in BEC to be available. For cSAXS, these are most liikely to be from the
GalilRIO device that controls the BPM amplifier.
Args:
name (str): Name of the pseudo device.
gain_lsb (str): Name of the signal in BEC that controls the LSB
of the gain setting.
gain_mid (str): Name of the signal in BEC that controls the MID
bit of the gain setting.
gain_msb (str): Name of the signal in BEC that controls the MSB
of the gain setting.
coupling (str): Name of the signal in BEC that controls the coupling
setting.
speed_mode (str): Name of the signal in BEC that controls the speed mode
(low-noise vs high-speed) of the amplifier.
"""
USER_ACCESS = ["set_gain", "set_coupling"]
gain = Cpt(
BECProcessedSignal,
name="gain",
model_config=None,
kind=Kind.config,
doc="Gain of the amplifier",
)
coupling = Cpt(
BECProcessedSignal,
name="coupling",
model_config=None,
kind=Kind.config,
doc="Coupling of the amplifier",
)
speed = Cpt(
BECProcessedSignal,
name="speed",
model_config=None,
kind=Kind.config,
doc="Speed of the amplifier",
)
def __init__(
self,
name: str,
gain_lsb: str,
gain_mid: str,
gain_msb: str,
coupling: str,
speed_mode: str,
device_manager: DeviceManagerDS | None = None,
scan_info: ScanInfo | None = None,
**kwargs,
):
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
# First we get all signal objects from BEC using the utility method provided by the BECProcessedSignal class.
self._gain_lsb = self.gain.get_device_object_from_bec(
object_name=gain_lsb, signal_name=self.name, device_manager=device_manager
)
self._gain_mid = self.gain.get_device_object_from_bec(
object_name=gain_mid, signal_name=self.name, device_manager=device_manager
)
self._gain_msb = self.gain.get_device_object_from_bec(
object_name=gain_msb, signal_name=self.name, device_manager=device_manager
)
self._coupling = self.gain.get_device_object_from_bec(
object_name=coupling, signal_name=self.name, device_manager=device_manager
)
self._speed_mode = self.gain.get_device_object_from_bec(
object_name=speed_mode, signal_name=self.name, device_manager=device_manager
)
# Set the compute methods for the virtual signals.
self.gain.set_compute_method(
self._compute_gain,
msb=self._gain_msb,
mid=self._gain_mid,
lsb=self._gain_lsb,
speed_mode=self._speed_mode,
)
self.coupling.set_compute_method(self._compute_coupling, coupling=self._coupling)
self.speed.set_compute_method(self._compute_speed, speed=self._speed_mode)
def set_gain(
self,
gain: Literal[
1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000, 100000000000
],
) -> None:
"""
Set the gain of the amplifier.
Args:
gain (Literal): Must be one of 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000.
"""
gain_int = int(gain)
if gain_int not in VALID_GAINS:
raise ValueError(
f"{self.name} received invalid gain {gain_int}, must be in {VALID_GAINS}"
)
msb, mid, lsb, use_low_noise = _GAIN_TO_BITS[gain_int]
self._gain_msb.set(bool(msb)).wait(timeout=2)
self._gain_lsb.set(bool(lsb)).wait(timeout=2)
self._gain_mid.set(bool(mid)).wait(timeout=2)
self._speed_mode.set(bool(use_low_noise))
def set_coupling(self, coupling: Literal["AC", "DC"]) -> None:
"""
Set the coupling of the amplifier.
Args:
coupling (Literal): Must be either "AC" or "DC".
"""
if coupling not in ["AC", "DC"]:
raise ValueError(
f"{self.name} received invalid coupling value {coupling}, please use 'AC' or 'DC'"
)
self._coupling.set(coupling == "DC").wait(timeout=2)
def _compute_gain(self, msb: Signal, mid: Signal, lsb: Signal, speed_mode: Signal) -> int:
"""Compute the gain based on the bits and speed mode."""
bits = (msb.get(), mid.get(), lsb.get())
speed_mode = speed_mode.get()
if speed_mode:
return _GAIN_BITS_LOW_NOISE.get(bits)
else:
return _GAIN_BITS_HIGH_SPEED.get(bits)
def _compute_coupling(self, coupling: Signal) -> str:
"""Compute the coupling based on the signal."""
return "DC" if coupling.get() else "AC"
def _compute_speed(self, speed: Signal) -> str:
"""Compute the speed based on the signal."""
return "low_speed" if speed.get() else "high_speed"

View File

@@ -0,0 +1 @@
# from ophyd

View File

@@ -0,0 +1,41 @@
"""
Pseudo device that forwards a single BEC signal 1:1.
"""
from ophyd import Component as Cpt
from ophyd import Kind, Signal
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
class SignalForwarder(PSIPseudoDeviceBase):
"""Forward one signal unchanged."""
signal = Cpt(
BECProcessedSignal,
name="signal",
model_config=None,
kind=Kind.config,
doc="Forwarded signal",
)
def __init__(
self,
name,
signal: str,
device_manager=None,
scan_info=None,
**kwargs,
):
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
src = self.signal.get_device_object_from_bec(
object_name=signal,
signal_name=self.name,
device_manager=device_manager,
)
self.signal.set_compute_method(self._compute_signal, signal=src)
def _compute_signal(self, signal: Signal) -> float:
return signal.get()

View File

@@ -1 +1 @@
from .csaxs_nexus import NeXus_format as cSAXS_NeXus_format
from .csaxs_nexus import cSAXSNeXusFormat

View File

@@ -1,445 +1,470 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any
import numpy as np
if TYPE_CHECKING:
from bec_lib.devicemanager import DeviceManagerBase
from bec_server.file_writer.file_writer import HDF5Storage
from bec_server.file_writer.default_writer import DefaultFormat
def get_entry(data: dict, name: str, default=None) -> Any:
class cSAXSNeXusFormat(DefaultFormat):
"""
Get an entry from the scan data assuming a <device>.<device>.value structure.
NeXus file format for the cSAXS beamline (BEC era).
Args:
data (dict): Scan data
name (str): Entry name
default (Any, optional): Default value. Defaults to None.
Mirrors the old SPEC layout.xml hierarchy and adds the flOMNI instrument
group for the nano-positioning stage used in ptychography.
Device resilience
-----------------
Every device read (self.get_entry / device call) is wrapped in try/except.
If a device is removed from the BEC config file between sessions it simply
disappears from the device_manager — the corresponding dataset or link is
silently omitted from the HDF5 file without raising an error. This means
the file structure is additive: re-add the device to the config and the
field reappears automatically on the next scan.
Top-level HDF5 structure
────────────────────────
/entry NXentry (definition = NXptycho)
/sample NXsample ← primary sample group
/entry_ptycho NXentry ← generic ptycho entry
/data_soft NXentry ← convenience Eiger frame links
/control NXmonitor
/instrument NXinstrument
/source
/insertion_device
/monochromator
/XBPM3
/slit_3 … slit_5
/filter_set
/beam_stop_1 … beam_stop_2
/eiger_1_5 NXdetector
/mcs NXdetector
/flOMNI NXpositioner
Device name mapping (old SPEC → current BEC)
────────────────────────────────────────────
samx / samy → samx / samy (generic; kept for non-flOMNI configs)
sl3wh/wv/ch/cv → sl3trxi/o/b/t (individual blade motors; gap/centre TODO)
sl4wh/wv/ch/cv → sl4trxi/o/b/t
sl5wh/wv/ch/cv → sl5trxi/o/b/t
bs1x / bs1y → bs1x / bs1y
bs2x / bs2y → bs2x / bs2y
dettrx → dettrx
eiger_4 → eiger_1_5
mcs → mcs
filter_array → filter_array_1_x … filter_array_4_x
xbpm3 → xbpm3x / xbpm3y (stage positions; signal readouts TODO)
energy → ccm_energy
TODO (devices not yet in BEC list)
───────────────────────────────────
curr, idgap ring current, undulator gap
moth1, mobd monochromator crystal angles
mith, mibd, mirror_coating mirror
bpm3s/x/y/z XBPM3 signal readouts
sl0 / sl1 / sl2 upstream optics-hutch slits
slit gap / centre derived from blade pairs + calibration offset
"""
if isinstance(data.get(name), list) and isinstance(data.get(name)[0], dict):
return [sub_data.get(name, {}).get("value", default) for sub_data in data.get(name)]
return data.get(name, {}).get(name, {}).get("value", default)
# -------------------------------------------------------------------------
# Helpers
# -------------------------------------------------------------------------
def _safe_dataset(self, group, name: str, device: str,
units: str | None = None,
description: str | None = None) -> None:
"""
Write a dataset from the BEC scan data dictionary.
Silently skips if the device was not recorded in this scan
(e.g. removed from config, readoutPriority=on_request and not triggered,
or the scan finished before the device responded).
"""
try:
value = self.get_entry(device)
ds = group.create_dataset(name, data=value)
if units:
ds.attrs["units"] = units
if description:
ds.attrs["description"] = description
except Exception:
pass
def _safe_soft_link(self, group, name: str, target: str) -> None:
"""Create a soft link; silently skip on any error."""
try:
group.create_soft_link(name, target)
except Exception:
pass
def _slit_blades(self, group, prefix: str) -> None:
"""
Store individual blade motor positions for a 4-blade slit set.
Derived quantities (gap, centre) require a per-slit calibration offset
and will be added in a later update.
"""
for blade, motor in [
("inner_x", f"{prefix}trxi"),
("outer_x", f"{prefix}trxo"),
("bottom_y", f"{prefix}trxb"),
("top_y", f"{prefix}trxt"),
]:
self._safe_dataset(group, blade, motor, units="mm")
def NeXus_format(
storage: HDF5Storage, data: dict, file_references: dict, device_manager: DeviceManagerBase
) -> HDF5Storage:
"""
Prepare the NeXus file format.
# -------------------------------------------------------------------------
# Main format method
# -------------------------------------------------------------------------
Args:
storage (HDF5Storage): HDF5 storage. Pseudo hdf5 file container that will be written to disk later.
data (dict): scan data
file_references (dict): File references. Can be used to add external files to the HDF5 file. The path is given relative to the HDF5 file.
device_manager (DeviceManagerBase): Device manager. Can be used to check if devices are available.
def format(self) -> None:
"""Build the NeXus/HDF5 layout for a cSAXS scan."""
Returns:
HDF5Storage: Updated HDF5 storage
"""
# /entry
entry = storage.create_group("entry")
entry.attrs["NX_class"] = "NXentry"
entry.attrs["definition"] = "NXsas"
entry.attrs["start_time"] = data.get("start_time")
entry.attrs["end_time"] = data.get("end_time")
entry.attrs["version"] = 1.0
# Canonical paths referenced by multiple groups
RT_POS_PATH = "/entry/instrument/flOMNI/rt_positions"
EIGER_COLL = "/entry/collection/file_references/eiger_1_5"
# /entry/collection
collection = entry.create_group("collection")
collection.attrs["NX_class"] = "NXcollection"
bec_collection = collection.create_group("bec")
# ── Root entry ────────────────────────────────────────────────────────
entry = self.storage.create_group("entry")
entry.attrs["NX_class"] = "NXentry"
entry.attrs["definition"] = "NXptycho"
# /entry/control
control = entry.create_group("control")
control.attrs["NX_class"] = "NXmonitor"
control.create_dataset(name="mode", data="monitor")
control.create_dataset(name="integral", data=get_entry(data, "bpm4i"))
# ── /entry/sample ─────────────────────────────────────────────────────
# Primary sample group. Contains the name of the mounted sample and a
# link to the real-time scan positions. Generic samx/samy are recorded
# here so the group is meaningful for non-flOMNI configurations too.
sample = entry.create_group("sample")
sample.attrs["NX_class"] = "NXsample"
# Soft-link name directly to the value BEC recorded in the collection.
# Only written when flomni_samples is present; other configs leave name absent.
if "flomni_samples" in self.device_manager.devices:
self._safe_soft_link(
sample, "name",
"/entry/collection/devices/flomni_samples"
"/flomni_samples_sample_names_sample0/value",
)
# Generic coarse stage positions (meaningful in non-flOMNI setups)
self._safe_dataset(sample, "x_translation", "samx", units="mm")
self._safe_dataset(sample, "y_translation", "samy", units="mm")
# Real-time encoder positions — the primary scan coordinate
self._safe_soft_link(sample, "positions", RT_POS_PATH)
# /entry/data
main_data = entry.create_group("data")
main_data.attrs["NX_class"] = "NXdata"
if "eiger_4" in device_manager.devices:
main_data.create_soft_link(name="data", target="/entry/instrument/eiger_4/data")
elif "eiger9m" in device_manager.devices:
main_data.create_soft_link(name="data", target="/entry/instrument/eiger9m/data")
elif "pilatus_2" in device_manager.devices:
main_data.create_soft_link(name="data", target="/entry/instrument/pilatus_2/data")
# ── /entry/entry_ptycho ───────────────────────────────────────────────
# Generic ptychography entry. Detector data and scan positions are
# linked in from the instrument groups so this entry is self-contained
# for downstream reconstruction codes.
entry_ptycho = entry.create_group("entry_ptycho")
entry_ptycho.attrs["NX_class"] = "NXentry"
entry_ptycho.attrs["definition"] = "NXptycho"
# /entry/sample
control = entry.create_group("sample")
control.attrs["NX_class"] = "NXsample"
control.create_dataset(name="name", data=get_entry(data, "samplename"))
control.create_dataset(name="description", data=data.get("sample_description"))
x_translation = control.create_dataset(name="x_translation", data=get_entry(data, "samx"))
x_translation.attrs["units"] = "mm"
y_translation = control.create_dataset(name="y_translation", data=get_entry(data, "samy"))
y_translation.attrs["units"] = "mm"
temperature_log = control.create_dataset(name="temperature_log", data=get_entry(data, "temp"))
temperature_log.attrs["units"] = "K"
nxdata = entry_ptycho.create_group("data")
nxdata.attrs["NX_class"] = "NXdata"
nxdata.attrs["signal"] = "data"
# Detector frames
try:
for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys():
self._safe_soft_link(nxdata, k, f"{EIGER_COLL}/{k}")
except Exception:
pass
# Scan positions
self._safe_soft_link(nxdata, "positions", RT_POS_PATH)
# /entry/instrument
instrument = entry.create_group("instrument")
instrument.attrs["NX_class"] = "NXinstrument"
instrument.create_dataset(name="name", data="cSAXS beamline")
# Link to the primary sample group
self._safe_soft_link(entry_ptycho, "sample", "/entry/sample")
source = instrument.create_group("source")
source.attrs["NX_class"] = "NXsource"
source.create_dataset(name="type", data="Synchrotron X-ray Source")
source.create_dataset(name="name", data="Swiss Light Source")
source.create_dataset(name="probe", data="x-ray")
distance = source.create_dataset(
name="distance", data=-33800 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
sigma_x = source.create_dataset(name="sigma_x", data=0.202)
sigma_x.attrs["units"] = "mm"
sigma_y = source.create_dataset(name="sigma_y", data=0.018)
sigma_y.attrs["units"] = "mm"
divergence_x = source.create_dataset(name="divergence_x", data=0.000135)
divergence_x.attrs["units"] = "radians"
divergence_y = source.create_dataset(name="divergence_y", data=0.000025)
divergence_y.attrs["units"] = "radians"
current = source.create_dataset(name="current", data=get_entry(data, "curr"))
current.attrs["units"] = "mA"
# ── /entry/data_soft ──────────────────────────────────────────────────
# Convenience group mirroring the old /entry/data hardlink from layout.xml.
data_soft = entry.create_group("data_soft")
data_soft.attrs["NX_class"] = "NXentry"
try:
for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys():
self._safe_soft_link(data_soft, k, f"{EIGER_COLL}/{k}")
except Exception:
pass
insertion_device = instrument.create_group("insertion_device")
insertion_device.attrs["NX_class"] = "NXinsertion_device"
source.create_dataset(name="type", data="undulator")
gap = source.create_dataset(name="gap", data=get_entry(data, "idgap"))
gap.attrs["units"] = "mm"
k = source.create_dataset(name="k", data=2.46)
k.attrs["units"] = "NX_DIMENSIONLESS"
length = source.create_dataset(name="length", data=1820)
length.attrs["units"] = "mm"
# ── /entry/control ────────────────────────────────────────────────────
control = entry.create_group("control")
control.attrs["NX_class"] = "NXmonitor"
control.create_dataset("mode", data="monitor")
# TODO: beam intensity integral — add device when available
# self._safe_dataset(control, "integral", "bpm_sum", units="NX_DIMENSIONLESS")
slit_0 = instrument.create_group("slit_0")
slit_0.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="OFHC Cu")
source.create_dataset(name="description", data="Horizontal secondary source slit")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl0wh"))
x_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl0ch"))
x_translation.attrs["units"] = "mm"
distance = source.create_dataset(
name="distance", data=-21700 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
# ── /entry/instrument ─────────────────────────────────────────────────
instrument = entry.create_group("instrument")
instrument.attrs["NX_class"] = "NXinstrument"
instrument.create_dataset("name", data="cSAXS beamline")
slit_1 = instrument.create_group("slit_1")
slit_1.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="OFHC Cu")
source.create_dataset(name="description", data="Horizontal secondary source slit")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl1wh"))
x_gap.attrs["units"] = "mm"
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl1wv"))
y_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl1ch"))
x_translation.attrs["units"] = "mm"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl1ch"))
height.attrs["units"] = "mm"
distance = source.create_dataset(
name="distance", data=-7800 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
# ── Source ────────────────────────────────────────────────────────────
# Numerical values are currently unknown and stored as 0.
# Will be updated once the corresponding devices are in BEC.
source = instrument.create_group("source")
source.attrs["NX_class"] = "NXsource"
source.create_dataset("type", data="Synchrotron X-ray Source")
source.create_dataset("name", data="Swiss Light Source")
source.create_dataset("probe", data="x-ray")
source.create_dataset("sigma_x", data=0.0).attrs["units"] = "mm"
source.create_dataset("sigma_y", data=0.0).attrs["units"] = "mm"
source.create_dataset("divergence_x", data=0.0).attrs["units"] = "radians"
source.create_dataset("divergence_y", data=0.0).attrs["units"] = "radians"
# TODO: current — add device when available
# self._safe_dataset(source, "current", "curr", units="mA")
mono = instrument.create_group("monochromator")
mono.attrs["NX_class"] = "NXmonochromator"
mokev = data.get("mokev", {})
if mokev:
if isinstance(mokev, list):
mokev = mokev[0]
wavelength = mono.create_dataset(
name="wavelength", data=12.3984193 / (mokev.get("mokev").get("value") + 1e-9)
# ── Insertion device ──────────────────────────────────────────────────
insertion_device = instrument.create_group("insertion_device")
insertion_device.attrs["NX_class"] = "NXinsertion_device"
insertion_device.create_dataset("type", data="undulator")
insertion_device.create_dataset("k", data=0.0)
insertion_device.create_dataset("length", data=0.0).attrs["units"] = "mm"
# TODO: gap — add device when available
# self._safe_dataset(insertion_device, "gap", "idgap", units="mm")
# ── Monochromator ─────────────────────────────────────────────────────
# ccm_energy is a baseline device and is recorded in the scan data.
mono = instrument.create_group("monochromator")
mono.attrs["NX_class"] = "NXmonochromator"
mono.create_dataset("type", data="Double crystal fixed exit monochromator.")
try:
energy_kev = self.get_entry("ccm_energy")
energy_arr = np.asarray(energy_kev, dtype=float)
en_ds = mono.create_dataset("energy", data=energy_arr)
en_ds.attrs["units"] = "keV"
with np.errstate(divide="ignore", invalid="ignore"):
wavelength = np.where(energy_arr != 0, 12.3984193 / energy_arr, 0.0)
wl_ds = mono.create_dataset("wavelength", data=wavelength)
wl_ds.attrs["units"] = "Angstrom"
except Exception:
pass
# TODO: crystal angles — add moth1 / mobd when available
# crystal_1 = mono.create_group("crystal_1")
# crystal_1.attrs["NX_class"] = "NXcrystal"
# crystal_1.create_dataset("usage", data="Bragg")
# crystal_1.create_dataset("type", data="Si")
# crystal_1.create_dataset("order_no", data=1.0)
# crystal_1.create_dataset("reflection", data="[1 1 1]")
# self._safe_dataset(crystal_1, "bragg_angle", "moth1", units="degrees")
# crystal_2 = mono.create_group("crystal_2")
# crystal_2.attrs["NX_class"] = "NXcrystal"
# crystal_2.create_dataset("usage", data="Bragg")
# crystal_2.create_dataset("type", data="Si")
# crystal_2.create_dataset("order_no", data=2.0)
# crystal_2.create_dataset("reflection", data="[1 1 1]")
# self._safe_dataset(crystal_2, "bragg_angle", "moth1", units="degrees")
# self._safe_dataset(crystal_2, "bend_x", "mobd", units="degrees")
# ── Mirror ────────────────────────────────────────────────────────────
# TODO: mith, mibd, mirror_coating not yet in device list
# mirror = instrument.create_group("mirror")
# mirror.attrs["NX_class"] = "NXmirror"
# mirror.create_dataset("type", data="single")
# mirror.create_dataset(
# "description",
# data=(
# "Grazing incidence mirror to reject high-harmonic wavelengths. "
# "Three coating options: no coating (SiO2), rhodium (Rh), platinum (Pt)."
# ),
# )
# mirror.create_dataset("substrate_material", data="SiO2")
# self._safe_dataset(mirror, "incident_angle", "mith", units="degrees")
# self._safe_dataset(mirror, "coating_material", "mirror_coating", units="NX_CHAR")
# self._safe_dataset(mirror, "bend_y", "mibd", units="NX_DIMENSIONLESS")
# ── Upstream slits (optics hutch) ─────────────────────────────────────
# TODO: slit_0 / slit_1 / slit_2 motors not yet in BEC device list
# slit_0 = instrument.create_group("slit_0")
# ...
# slit_1 = instrument.create_group("slit_1")
# ...
# slit_2 = instrument.create_group("slit_2")
# ...
# ── XBPM3 ─────────────────────────────────────────────────────────────
# xbpm3x/xbpm3y are stage motor positions for aligning the monitor.
# Signal readouts (sum/x/y/skew) are TODO once MCS channels are mapped.
xbpm3 = instrument.create_group("XBPM3")
xbpm3.attrs["NX_class"] = "NXdetector"
xbpm3.attrs["description"] = "X-ray beam position monitor 3, experimental hutch"
self._safe_dataset(xbpm3, "x_stage", "xbpm3x", units="mm",
description="XBPM3 stage x-translation")
self._safe_dataset(xbpm3, "y_stage", "xbpm3y", units="mm",
description="XBPM3 stage y-translation")
# TODO: signal readout sub-groups once MCS channels are configured
# for suffix, entry_name, desc in [
# ("sum", "bpm3s", "Sum of counts for the four quadrants."),
# ("x", "bpm3x", "Normalized diff, left vs right quadrants."),
# ("y", "bpm3y", "Normalized diff, high vs low quadrants."),
# ("skew", "bpm3z", "Normalized diff, diagonal quadrants."),
# ]:
# g = xbpm3.create_group(f"XBPM3_{suffix}")
# self._safe_dataset(g, "data", entry_name, units="NX_DIMENSIONLESS")
# g.create_dataset("description", data=desc)
# ── Slit 3 (experimental hutch, exposure box) ─────────────────────────
slit_3 = instrument.create_group("slit_3")
slit_3.attrs["NX_class"] = "NXslit"
slit_3.create_dataset("material", data="Si")
slit_3.create_dataset("description", data="Slit 3, experimental hutch, exposure box")
# TODO: gap / centre require per-slit calibration offset — add later
self._slit_blades(slit_3, "sl3")
# ── Filter set ────────────────────────────────────────────────────────
filter_set = instrument.create_group("filter_set")
filter_set.attrs["NX_class"] = "NXattenuator"
filter_set.create_dataset("material", data="Si")
filter_set.create_dataset(
"description",
data=(
"Four linear filter stages (filter_array_1_x … filter_array_4_x). "
"Each stage has five filter positions plus an 'out' position."
),
)
wavelength.attrs["units"] = "Angstrom"
energy = mono.create_dataset(name="energy", data=mokev.get("mokev").get("value"))
energy.attrs["units"] = "keV"
mono.create_dataset(name="type", data="Double crystal fixed exit monochromator.")
distance = mono.create_dataset(
name="distance", data=-5220 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
for i in range(1, 5):
self._safe_dataset(filter_set, f"stage_{i}_x",
f"filter_array_{i}_x", units="mm")
# TODO: attenuator_transmission = 10^(ftrans) once device is available
crystal_1 = mono.create_group("crystal_1")
crystal_1.attrs["NX_class"] = "NXcrystal"
crystal_1.create_dataset(name="usage", data="Bragg")
crystal_1.create_dataset(name="order_no", data="1")
crystal_1.create_dataset(name="reflection", data="[1 1 1]")
bragg_angle = crystal_1.create_dataset(name="bragg_angle", data=get_entry(data, "moth1"))
bragg_angle.attrs["units"] = "degrees"
# ── Slit 4 (experimental hutch, exposure box) ─────────────────────────
slit_4 = instrument.create_group("slit_4")
slit_4.attrs["NX_class"] = "NXslit"
slit_4.create_dataset("material", data="Ge")
slit_4.create_dataset("description", data="Slit 4, experimental hutch, exposure box")
self._slit_blades(slit_4, "sl4")
crystal_2 = mono.create_group("crystal_2")
crystal_2.attrs["NX_class"] = "NXcrystal"
crystal_2.create_dataset(name="usage", data="Bragg")
crystal_2.create_dataset(name="order_no", data="2")
crystal_2.create_dataset(name="reflection", data="[1 1 1]")
bragg_angle = crystal_2.create_dataset(name="bragg_angle", data=get_entry(data, "moth1"))
bragg_angle.attrs["units"] = "degrees"
bend_x = crystal_2.create_dataset(name="bend_x", data=get_entry(data, "mobd"))
bend_x.attrs["units"] = "degrees"
# ── Slit 5 (experimental hutch, exposure box) ─────────────────────────
slit_5 = instrument.create_group("slit_5")
slit_5.attrs["NX_class"] = "NXslit"
slit_5.create_dataset("material", data="Si")
slit_5.create_dataset("description", data="Slit 5, experimental hutch, exposure box")
self._slit_blades(slit_5, "sl5")
xbpm4 = instrument.create_group("XBPM4")
xbpm4.attrs["NX_class"] = "NXdetector"
xbpm4_sum = xbpm4.create_group("XBPM4_sum")
xbpm4_sum_data = xbpm4_sum.create_dataset(name="data", data=get_entry(data, "bpm4s"))
xbpm4_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm4_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
xbpm4_x = xbpm4.create_group("XBPM4_x")
xbpm4_x_data = xbpm4_x.create_dataset(name="data", data=get_entry(data, "bpm4x"))
xbpm4_x_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm4_x.create_dataset(
name="description", data="Normalized difference of counts between left and right quadrants."
)
xbpm4_y = xbpm4.create_group("XBPM4_y")
xbpm4_y_data = xbpm4_y.create_dataset(name="data", data=get_entry(data, "bpm4y"))
xbpm4_y_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm4_y.create_dataset(
name="description", data="Normalized difference of counts between high and low quadrants."
)
xbpm4_skew = xbpm4.create_group("XBPM4_skew")
xbpm4_skew_data = xbpm4_skew.create_dataset(name="data", data=get_entry(data, "bpm4z"))
xbpm4_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm4_skew.create_dataset(
name="description", data="Normalized difference of counts between diagonal quadrants."
)
# ── Beam stop 1 ────────────────────────────────────────────────────────
beam_stop_1 = instrument.create_group("beam_stop_1")
beam_stop_1.attrs["NX_class"] = "NXbeam_stop"
beam_stop_1.create_dataset("description", data="circular")
beam_stop_1.create_dataset("size", data=3.0).attrs["units"] = "mm"
self._safe_dataset(beam_stop_1, "x", "bs1x", units="mm")
self._safe_dataset(beam_stop_1, "y", "bs1y", units="mm")
# TODO: diode signal behind beam stop 1 when device is available
mirror = instrument.create_group("mirror")
mirror.attrs["NX_class"] = "NXmirror"
mirror.create_dataset(name="type", data="single")
mirror.create_dataset(
name="description",
data="Grazing incidence mirror to reject high-harmonic wavelengths from the monochromator. There are three coating options available that are used depending on the X-ray energy, no coating (SiO2), rhodium (Rh) or platinum (Pt).",
)
incident_angle = mirror.create_dataset(name="incident_angle", data=get_entry(data, "mith"))
incident_angle.attrs["units"] = "degrees"
substrate_material = mirror.create_dataset(name="substrate_material", data="SiO2")
substrate_material.attrs["units"] = "NX_CHAR"
coating_material = mirror.create_dataset(name="coating_material", data="SiO2")
coating_material.attrs["units"] = "NX_CHAR"
bend_y = mirror.create_dataset(name="bend_y", data="mibd")
bend_y.attrs["units"] = "NX_DIMENSIONLESS"
distance = mirror.create_dataset(
name="distance", data=-4370 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
# ── Beam stop 2 ────────────────────────────────────────────────────────
beam_stop_2 = instrument.create_group("beam_stop_2")
beam_stop_2.attrs["NX_class"] = "NXbeam_stop"
beam_stop_2.create_dataset("description", data="rectangular")
beam_stop_2.create_dataset("size_x", data=5.0).attrs["units"] = "mm"
beam_stop_2.create_dataset("size_y", data=2.25).attrs["units"] = "mm"
self._safe_dataset(beam_stop_2, "x", "bs2x", units="mm")
self._safe_dataset(beam_stop_2, "y", "bs2y", units="mm")
# TODO: diode (transmitted signal) when device is available
xbpm5 = instrument.create_group("XBPM5")
xbpm5.attrs["NX_class"] = "NXdetector"
xbpm5_sum = xbpm5.create_group("XBPM5_sum")
xbpm5_sum_data = xbpm5_sum.create_dataset(name="data", data=get_entry(data, "bpm5s"))
xbpm5_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm5_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
xbpm5_x = xbpm5.create_group("XBPM5_x")
xbpm5_x_data = xbpm5_x.create_dataset(name="data", data=get_entry(data, "bpm5x"))
xbpm5_x_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm5_x.create_dataset(
name="description", data="Normalized difference of counts between left and right quadrants."
)
xbpm5_y = xbpm5.create_group("XBPM5_y")
xbpm5_y_data = xbpm5_y.create_dataset(name="data", data=get_entry(data, "bpm5y"))
xbpm5_y_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm5_y.create_dataset(
name="description", data="Normalized difference of counts between high and low quadrants."
)
xbpm5_skew = xbpm5.create_group("XBPM5_skew")
xbpm5_skew_data = xbpm5_skew.create_dataset(name="data", data=get_entry(data, "bpm5z"))
xbpm5_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm5_skew.create_dataset(
name="description", data="Normalized difference of counts between diagonal quadrants."
)
slit_2 = instrument.create_group("slit_2")
slit_2.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="Ag")
source.create_dataset(name="description", data="Slit 2, optics hutch")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl2wh"))
x_gap.attrs["units"] = "mm"
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl2wv"))
y_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl2ch"))
x_translation.attrs["units"] = "mm"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl2cv"))
height.attrs["units"] = "mm"
distance = source.create_dataset(
name="distance", data=-3140 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
slit_3 = instrument.create_group("slit_3")
slit_3.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="Si")
source.create_dataset(name="description", data="Slit 3, experimental hutch, exposure box")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl3wh"))
x_gap.attrs["units"] = "mm"
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl3wv"))
y_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl3ch"))
x_translation.attrs["units"] = "mm"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl3cv"))
height.attrs["units"] = "mm"
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0))
# distance.attrs["units"] = "mm"
filter_set = instrument.create_group("filter_set")
filter_set.attrs["NX_class"] = "NXattenuator"
filter_set.create_dataset(name="material", data="Si")
filter_set.create_dataset(
name="description",
data="The filter set consists of 4 linear stages, each with five filter positions. Additionally, each one allows for an out position to allow 'no filtering'.",
)
attenuator_transmission = filter_set.create_dataset(
name="attenuator_transmission", data=10 ** get_entry(data, "ftrans", 0)
)
attenuator_transmission.attrs["units"] = "NX_DIMENSIONLESS"
slit_4 = instrument.create_group("slit_4")
slit_4.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="Si")
source.create_dataset(name="description", data="Slit 4, experimental hutch, exposure box")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl4wh"))
x_gap.attrs["units"] = "mm"
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl4wv"))
y_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl4ch"))
x_translation.attrs["units"] = "mm"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl4cv"))
height.attrs["units"] = "mm"
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0))
# distance.attrs["units"] = "mm"
slit_5 = instrument.create_group("slit_5")
slit_5.attrs["NX_class"] = "NXslit"
source.create_dataset(name="material", data="Si")
source.create_dataset(name="description", data="Slit 5, experimental hutch, exposure box")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl5wh"))
x_gap.attrs["units"] = "mm"
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl5wv"))
y_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl5ch"))
x_translation.attrs["units"] = "mm"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl5cv"))
height.attrs["units"] = "mm"
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0))
# distance.attrs["units"] = "mm"
beam_stop_1 = instrument.create_group("beam_stop_1")
beam_stop_1.attrs["NX_class"] = "NX_beamstop"
beam_stop_1.create_dataset(name="description", data="circular")
bms1_size = beam_stop_1.create_dataset(name="size", data=3)
bms1_size.attrs["units"] = "mm"
bms1_x = beam_stop_1.create_dataset(name="size", data=get_entry(data, "bs1x"))
bms1_x.attrs["units"] = "mm"
bms1_y = beam_stop_1.create_dataset(name="size", data=get_entry(data, "bs1y"))
bms1_y.attrs["units"] = "mm"
beam_stop_2 = instrument.create_group("beam_stop_2")
beam_stop_2.attrs["NX_class"] = "NX_beamstop"
beam_stop_2.create_dataset(name="description", data="rectangular")
bms2_size_x = beam_stop_2.create_dataset(name="size_x", data=5)
bms2_size_x.attrs["units"] = "mm"
bms2_size_y = beam_stop_2.create_dataset(name="size_y", data=2.25)
bms2_size_y.attrs["units"] = "mm"
bms2_x = beam_stop_2.create_dataset(name="size", data=get_entry(data, "bs2x"))
bms2_x.attrs["units"] = "mm"
bms2_y = beam_stop_2.create_dataset(name="size", data=get_entry(data, "bs2y"))
bms2_y.attrs["units"] = "mm"
bms2_data = beam_stop_2.create_dataset(name="data", data=get_entry(data, "diode"))
bms2_data.attrs["units"] = "NX_DIMENSIONLESS"
if "eiger1p5m" in device_manager.devices and device_manager.devices.eiger1p5m.enabled:
eiger_4 = instrument.create_group("eiger_4")
eiger_4.attrs["NX_class"] = "NXdetector"
x_pixel_size = eiger_4.create_dataset(name="x_pixel_size", data=75)
x_pixel_size.attrs["units"] = "um"
y_pixel_size = eiger_4.create_dataset(name="y_pixel_size", data=75)
y_pixel_size.attrs["units"] = "um"
polar_angle = eiger_4.create_dataset(name="polar_angle", data=0)
polar_angle.attrs["units"] = "degrees"
azimuthal_angle = eiger_4.create_dataset(name="azimuthal_angle", data=0)
azimuthal_angle.attrs["units"] = "degrees"
rotation_angle = eiger_4.create_dataset(name="rotation_angle", data=0)
rotation_angle.attrs["units"] = "degrees"
description = eiger_4.create_dataset(
name="description", data="Single-photon counting detector, 320 micron-thick Si chip"
)
orientation = eiger_4.create_group("orientation")
orientation.attrs["description"] = (
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
)
orientation.create_dataset(name="transpose", data=1)
orientation.create_dataset(name="rot90", data=3)
if (
"eiger9m" in device_manager.devices
and device_manager.devices.eiger9m.enabled
and "eiger9m" in file_references
):
eiger9m = instrument.create_group("eiger9m")
eiger9m.attrs["NX_class"] = "NXdetector"
x_pixel_size = eiger9m.create_dataset(name="x_pixel_size", data=75)
x_pixel_size.attrs["units"] = "um"
y_pixel_size = eiger9m.create_dataset(name="y_pixel_size", data=75)
y_pixel_size.attrs["units"] = "um"
polar_angle = eiger9m.create_dataset(name="polar_angle", data=0)
polar_angle.attrs["units"] = "degrees"
azimuthal_angle = eiger9m.create_dataset(name="azimuthal_angle", data=0)
azimuthal_angle.attrs["units"] = "degrees"
rotation_angle = eiger9m.create_dataset(name="rotation_angle", data=0)
rotation_angle.attrs["units"] = "degrees"
description = eiger9m.create_dataset(
name="description", data="Eiger9M detector, in-house developed, Paul Scherrer Institute"
)
orientation = eiger9m.create_group("orientation")
orientation.attrs["description"] = (
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
)
orientation.create_dataset(name="transpose", data=1)
orientation.create_dataset(name="rot90", data=3)
data = eiger9m.create_ext_link("data", file_references["eiger9m"]["path"], "EG9M/data")
status = eiger9m.create_ext_link(
"status", file_references["eiger9m"]["path"], "EG9M/status"
# ── Detector translation ───────────────────────────────────────────────
self._safe_dataset(
instrument, "detector_translation_x", "dettrx",
units="mm", description="Detector x-translation stage",
)
if (
"pilatus_2" in device_manager.devices
and device_manager.devices.pilatus_2.enabled
and "pilatus_2" in file_references
):
pilatus_2 = instrument.create_group("pilatus_2")
pilatus_2.attrs["NX_class"] = "NXdetector"
x_pixel_size = pilatus_2.create_dataset(name="x_pixel_size", data=172)
x_pixel_size.attrs["units"] = "um"
y_pixel_size = pilatus_2.create_dataset(name="y_pixel_size", data=172)
y_pixel_size.attrs["units"] = "um"
polar_angle = pilatus_2.create_dataset(name="polar_angle", data=0)
polar_angle.attrs["units"] = "degrees"
azimuthal_angle = pilatus_2.create_dataset(name="azimuthal_angle", data=0)
azimuthal_angle.attrs["units"] = "degrees"
rotation_angle = pilatus_2.create_dataset(name="rotation_angle", data=0)
rotation_angle.attrs["units"] = "degrees"
description = pilatus_2.create_dataset(
name="description", data="Pilatus 300K detector, Dectris, Switzerland"
)
orientation = pilatus_2.create_group("orientation")
orientation.attrs["description"] = (
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
)
orientation.create_dataset(name="transpose", data=1)
orientation.create_dataset(name="rot90", data=2)
data = pilatus_2.create_ext_link(
"data", file_references["pilatus_2"]["path"], "entry/instrument/pilatus_2/data"
)
# ── Eiger 1.5M detector ───────────────────────────────────────────────
if (
"eiger_1_5" in self.device_manager.devices
and self.device_manager.devices.eiger_1_5.enabled
and "eiger_1_5" in self.file_references
):
eiger = instrument.create_group("eiger_1_5")
eiger.attrs["NX_class"] = "NXdetector"
eiger.create_dataset("x_pixel_size", data=75.0).attrs["units"] = "um"
eiger.create_dataset("y_pixel_size", data=75.0).attrs["units"] = "um"
eiger.create_dataset("polar_angle", data=0.0).attrs["units"] = "degrees"
eiger.create_dataset("azimuthal_angle", data=0.0).attrs["units"] = "degrees"
eiger.create_dataset("rotation_angle", data=0.0).attrs["units"] = "degrees"
eiger.create_dataset(
"description",
data="Eiger 1.5M detector, in-house developed, Paul Scherrer Institute",
)
eiger.create_dataset(
"type",
data="Single-photon counting detector, 320 micron-thick Si chip",
)
orientation = eiger.create_group("orientation")
orientation.attrs["description"] = (
"Orientation defines the number of counterclockwise rotations by 90 deg "
"followed by a transposition to reach the 'cameraman orientation', "
"looking towards the beam."
)
orientation.create_dataset("transpose", data=1)
orientation.create_dataset("rot90", data=3)
# Soft-link recorded frame data from the BEC collection
try:
for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys():
self._safe_soft_link(eiger, k, f"{EIGER_COLL}/{k}")
except Exception:
pass
# External link to pixel mask in the Eiger master file
try:
eiger.create_ext_link(
"pixel_mask",
self.file_references["eiger_1_5"].file_path,
"/entry/instrument/detector/pixel_mask",
)
except Exception:
pass
if (
"falcon" in device_manager.devices
and device_manager.devices.falcon.enabled
and "falcon" in file_references
):
falcon = instrument.create_ext_link(
"falcon", file_references["falcon"]["path"], "entry/instrument/FalconX1"
)
# ── MCS (multi-channel scaler) ─────────────────────────────────────────
if (
"mcs" in self.device_manager.devices
and self.device_manager.devices.mcs.enabled
):
mcs_group = instrument.create_group("mcs")
mcs_group.attrs["NX_class"] = "NXdetector"
mcs_group.attrs["description"] = "MCS card cSAXS — multi-channel scaler"
self._safe_soft_link(mcs_group, "data", "/entry/collection/devices/mcs")
return storage
# ── flOMNI ────────────────────────────────────────────────────────────
# flomni_samples is used as the sentinel for the entire flOMNI setup.
# If it is absent from the device_manager (removed from config) the
# whole group is omitted. Individual datasets inside are still each
# guarded by _safe_dataset / _safe_soft_link in case a specific motor
# is temporarily disabled without removing the full setup.
if "flomni_samples" in self.device_manager.devices:
flomni = instrument.create_group("flOMNI")
flomni.attrs["NX_class"] = "NXpositioner"
flomni.attrs["description"] = "flOMNI flexible tOMography Nano Imaging"
# Galil motors — coarse sample stage
self._safe_dataset(flomni, "fsamx", "fsamx", units="mm", description="Sample coarse X")
self._safe_dataset(flomni, "fsamy", "fsamy", units="mm", description="Sample coarse Y")
self._safe_dataset(flomni, "fsamroy", "fsamroy", units="degrees", description="Sample rotation")
# Galil motors — sample transfer / tray
self._safe_dataset(flomni, "ftransx", "ftransx", units="mm", description="Sample transfer X")
self._safe_dataset(flomni, "ftransy", "ftransy", units="mm", description="Sample transfer Y")
self._safe_dataset(flomni, "ftransz", "ftransz", units="mm", description="Sample transfer Z")
self._safe_dataset(flomni, "ftray", "ftray", units="mm", description="Sample transfer tray")
# Galil motors — laser tracker
self._safe_dataset(flomni, "ftracky", "ftracky", units="mm", description="Laser tracker coarse Y")
self._safe_dataset(flomni, "ftrackz", "ftrackz", units="mm", description="Laser tracker coarse Z")
# Galil motors — X-ray eye
self._safe_dataset(flomni, "feyex", "feyex", units="mm", description="X-ray eye X")
self._safe_dataset(flomni, "feyey", "feyey", units="mm", description="X-ray eye Y")
# Galil motors — optics (zone plate)
self._safe_dataset(flomni, "foptx", "foptx", units="mm", description="Optics X")
self._safe_dataset(flomni, "fopty", "fopty", units="mm", description="Optics Y")
self._safe_dataset(flomni, "foptz", "foptz", units="mm", description="Optics Z")
# Galil motor — heater
self._safe_dataset(flomni, "fheater", "fheater", units="mm", description="Heater Y")
# Smaract motors — OSA (order-sorting aperture)
self._safe_dataset(flomni, "fosax", "fosax", units="mm", description="OSA X")
self._safe_dataset(flomni, "fosay", "fosay", units="mm", description="OSA Y")
self._safe_dataset(flomni, "fosaz", "fosaz", units="mm", description="OSA Z")
# Temperature and humidity sensor (soft link to BEC collection entry)
self._safe_soft_link(
flomni, "flomni_temphum",
"/entry/collection/devices/flomni_temphum",
)
# Real-time encoder positions (RtFlomniFlyer)
# Single soft link to the entire rt_positions folder in the BEC
# collection. This is the primary scan coordinate for ptychography.
self._safe_soft_link(
flomni, "rt_positions",
"/entry/collection/devices/rt_positions",
)

View File

@@ -27,20 +27,19 @@ from bec_lib import bec_logger, messages
from bec_lib.alarm_handler import Alarms
from bec_lib.endpoints import MessageEndpoints
from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.scans import SyncFlyScanBase
from bec_server.scan_server.scans import AsyncFlyScanBase
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import TRIGGERSOURCE
logger = bec_logger.logger
class FlomniFermatScan(SyncFlyScanBase):
class FlomniFermatScan(AsyncFlyScanBase):
scan_name = "flomni_fermat_scan"
scan_type = "fly"
required_kwargs = ["fovx", "fovy", "exp_time", "step", "angle"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
use_scan_progress_report = True
def __init__(
self,
@@ -104,6 +103,14 @@ class FlomniFermatScan(SyncFlyScanBase):
self.zshift = -100
self.flomni_rotation_status = None
def scan_report_instructions(self):
"""Scan report instructions for the progress bar"""
yield from self.stubs.scan_report_instruction({"device_progress": ["rt_positions"]})
@property
def monitor_sync(self) -> str:
return "rt_positions"
def initialize(self):
self.scan_motors = []
self.update_readout_priority()
@@ -113,10 +120,6 @@ class FlomniFermatScan(SyncFlyScanBase):
self.positions, corridor_size=self.optim_trajectory_corridor
)
@property
def monitor_sync(self):
return "rt_flomni"
def reverse_trajectory(self):
"""
Reverse the trajectory. Every other scan should be reversed to
@@ -290,26 +293,18 @@ class FlomniFermatScan(SyncFlyScanBase):
return np.array(positions)
def scan_core(self):
# use a device message to receive the scan number and
# scan ID before sending the message to the device server
yield from self.stubs.kickoff(device="rtx")
while True:
yield from self.stubs.read(group="monitored")
status = self.connector.get(MessageEndpoints.device_status("rt_scan"))
if status:
status_id = status.content.get("status", 1)
request_id = status.metadata.get("RID")
if status_id == 0 and self.metadata.get("RID") == request_id:
break
if status_id == 2 and self.metadata.get("RID") == request_id:
raise ScanAbortion(
"An error occured during the flomni readout:"
f" {status.metadata.get('error')}"
)
# send off the flyer
yield from self.stubs.kickoff(device="rt_positions")
# start the readout loop of the flyer
status = yield from self.stubs.complete(device="rt_positions", wait=False)
# read the monitors until the flyer is done
while not status.done:
yield from self.stubs.read(group="monitored", point_id=self.point_id)
self.point_id += 1
time.sleep(1)
logger.debug("reading monitors")
# yield from self.device_rpc("rtx", "controller.kickoff")
def move_to_start(self):
"""return to the start position"""
@@ -336,6 +331,7 @@ class FlomniFermatScan(SyncFlyScanBase):
yield from self.read_scan_motors()
self.prepare_positions()
yield from self._prepare_setup()
yield from self.scan_report_instructions()
yield from self.open_scan()
yield from self.stage()
yield from self.run_baseline_reading()

View File

@@ -217,6 +217,16 @@ def test_mcs_card_csaxs_complete_and_stop(mock_mcs_csaxs: MCSCardCSAXS):
assert not mcs._start_monitor_async_data_emission.is_set()
def test_mcs_on_stop(mock_mcs_csaxs: MCSCardCSAXS):
"""Test that on stop sets the omit_mca_callbacks flag. Also test that on stage clears the omit_mca_callbacks flag."""
mcs = mock_mcs_csaxs
assert mcs._omit_mca_callbacks.is_set() is False
mcs.stop()
assert mcs._omit_mca_callbacks.is_set() is True
mcs.stage()
assert mcs._omit_mca_callbacks.is_set() is False
def test_mcs_recovery(mock_mcs_csaxs: MCSCardCSAXS):
mcs = mock_mcs_csaxs
# Simulate ongoing acquisition

View File

@@ -0,0 +1,69 @@
import pytest
from bec_server.device_server.tests.utils import DMMock
from csaxs_bec.devices.omny.shutter import MonitorSignal, OMNYFastShutter
@pytest.mark.parametrize("auto_monitor", [False, True])
def test_monitor_signal_stores_auto_monitor(auto_monitor):
signal = MonitorSignal(name="signal", auto_monitor=auto_monitor)
assert signal.auto_monitor is auto_monitor
def test_monitor_signal_put_propagates_value_to_readback_callback():
signal = MonitorSignal(name="signal", auto_monitor=True)
initial_value = signal.read()[signal.name]["value"]
callback_values = []
callback_reads = []
def _test_cb(value, old_value, **kwargs):
callback_values.append((value, old_value))
callback_reads.append(kwargs["obj"].read())
signal.subscribe(_test_cb, event_type=signal.SUB_VALUE, run=False)
signal.put(1)
assert callback_values == [(1, initial_value)]
assert len(callback_reads) == 1
assert callback_reads[0][signal.name]["value"] == 1
assert signal.read()[signal.name]["value"] == 1
signal.put(0)
assert callback_values == [(1, initial_value), (0, 1)]
assert len(callback_reads) == 2
assert callback_reads[1][signal.name]["value"] == 0
assert signal.read()[signal.name]["value"] == 0
@pytest.fixture
def omny_fast_shutter():
shutter = OMNYFastShutter(name="omny_fast_shutter", device_manager=DMMock())
try:
yield shutter
finally:
shutter.destroy()
def test_omny_fast_shutter_uses_monitor_signal_with_auto_monitor(omny_fast_shutter):
assert isinstance(omny_fast_shutter.shutter, MonitorSignal)
assert omny_fast_shutter.shutter.auto_monitor is True
def test_omny_fast_shutter_propagates_signal_changes_to_device_readback(omny_fast_shutter):
signal_name = omny_fast_shutter.shutter.name
callback_reads = []
def _test_cb(**kwargs):
callback_reads.append(omny_fast_shutter.read())
omny_fast_shutter.shutter.subscribe(_test_cb, event_type=omny_fast_shutter.shutter.SUB_VALUE, run=False)
omny_fast_shutter.shutter.put(1)
assert len(callback_reads) == 1
assert callback_reads[0][signal_name]["value"] == 1
assert omny_fast_shutter.read()[signal_name]["value"] == 1
assert omny_fast_shutter.fshstatus() == 1

View File

@@ -0,0 +1,241 @@
"""Module to test the pseudo_device module."""
import pytest
from bec_lib.atlas_models import Device
from ophyd_devices.sim.sim_signals import SetableSignal
from csaxs_bec.devices.pseudo_devices.bpm import BPM
from csaxs_bec.devices.pseudo_devices.bpm_control import _GAIN_TO_BITS, BPMControl
@pytest.fixture
def patched_dm(dm_with_devices):
# Patch missing current_session attribute in the device manager
dm = dm_with_devices
setattr(dm, "current_session", dm._session)
#
signal_lsb = SetableSignal(name="gain_lsb", value=0, kind="config")
signal_mid = SetableSignal(name="gain_mid", value=0, kind="config")
signal_msb = SetableSignal(name="gain_msb", value=0, kind="config")
signal_coupling = SetableSignal(name="coupling", value=0, kind="config")
signal_speed = SetableSignal(name="speed_mode", value=0, kind="config")
for signal in [signal_lsb, signal_mid, signal_msb, signal_coupling, signal_speed]:
dev_cfg = Device(
name=signal.name,
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
enabled=True,
readoutPriority="baseline",
)
dm._session["devices"].append(dev_cfg.model_dump())
dm.devices._add_device(signal.name, signal)
return dm
@pytest.fixture
def bpm_control(patched_dm):
name = "bpm_control"
control_config = Device(
name=name,
deviceClass="csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl",
enabled=True,
readoutPriority="baseline",
deviceConfig={
"gain_lsb": "gain_lsb",
"gain_mid": "gain_mid",
"gain_msb": "gain_msb",
"coupling": "coupling",
"speed_mode": "speed_mode",
},
needs=["gain_lsb", "gain_mid", "gain_msb", "coupling", "speed_mode"],
)
patched_dm._session["devices"].append(control_config.model_dump())
try:
control = BPMControl(
name=name,
gain_lsb="gain_lsb",
gain_mid="gain_mid",
gain_msb="gain_msb",
coupling="coupling",
speed_mode="speed_mode",
device_manager=patched_dm,
)
patched_dm.devices._add_device(control.name, control)
control.wait_for_connection()
yield control
finally:
control.destroy()
def test_bpm_control_set_gain(bpm_control):
gain_lsb = bpm_control.device_manager.devices["gain_lsb"]
gain_mid = bpm_control.device_manager.devices["gain_mid"]
gain_msb = bpm_control.device_manager.devices["gain_msb"]
coupling = bpm_control.device_manager.devices["coupling"]
speed_mode = bpm_control.device_manager.devices["speed_mode"]
gain_lsb.put(0)
gain_mid.put(0)
gain_msb.put(0)
coupling.put(0)
speed_mode.put(1)
gain = bpm_control.gain.get()
assert _GAIN_TO_BITS.get(gain) == (0, 0, 0, speed_mode.get() == 1)
gain_val = 10000000
bpm_control.set_gain(gain_val)
assert _GAIN_TO_BITS.get(gain_val, ()) == (
gain_msb.get(),
gain_mid.get(),
gain_lsb.get(),
speed_mode.get(),
)
gain_val = 100000000000
bpm_control.set_gain(gain_val)
assert _GAIN_TO_BITS.get(gain_val, ()) == (
gain_msb.get(),
gain_mid.get(),
gain_lsb.get(),
speed_mode.get(),
)
with pytest.raises(ValueError):
bpm_control.set_gain(1005.0)
def test_bpm_control_set_coupling(bpm_control):
coupling = bpm_control.device_manager.devices["coupling"]
coupling.put(0)
bpm_control.coupling.get() == "AC"
coupling.put(1)
bpm_control.coupling.get() == "DC"
bpm_control.set_coupling("AC")
assert coupling.get() == 0
with pytest.raises(ValueError):
bpm_control.set_coupling("wrong")
@pytest.fixture
def patched_dm_bpm(dm_with_devices):
# Patch missing current_session attribute in the device manager
dm = dm_with_devices
setattr(dm, "current_session", dm._session)
#
left_top = SetableSignal(name="left_top", value=0, kind="config")
right_top = SetableSignal(name="right_top", value=0, kind="config")
right_bot = SetableSignal(name="right_bot", value=0, kind="config")
left_bot = SetableSignal(name="left_bot", value=0, kind="config")
for signal in [left_top, right_top, right_bot, left_bot]:
dev_cfg = Device(
name=signal.name,
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
enabled=True,
readoutPriority="baseline",
)
dm._session["devices"].append(dev_cfg.model_dump())
dm.devices._add_device(signal.name, signal)
return dm
@pytest.fixture
def bpm(patched_dm_bpm):
name = "bpm"
bpm_config = Device(
name=name,
deviceClass="csaxs_bec.devices.pseudo_devices.bpm.BPM",
enabled=True,
readoutPriority="baseline",
deviceConfig={
"left_top": "left_top",
"right_top": "right_top",
"right_bot": "right_bot",
"left_bot": "left_bot",
},
needs=["left_top", "right_top", "right_bot", "left_bot"],
)
patched_dm_bpm._session["devices"].append(bpm_config.model_dump())
try:
bpm = BPM(
name=name,
left_top="left_top",
right_top="right_top",
right_bot="right_bot",
left_bot="left_bot",
device_manager=patched_dm_bpm,
)
patched_dm_bpm.devices._add_device(bpm.name, bpm)
bpm.wait_for_connection()
yield bpm
finally:
bpm.destroy()
def test_bpm_positions(bpm):
left_top = bpm.device_manager.devices["left_top"]
right_top = bpm.device_manager.devices["right_top"]
right_bot = bpm.device_manager.devices["right_bot"]
left_bot = bpm.device_manager.devices["left_bot"]
# Test center position
for signal in [left_top, right_top, right_bot, left_bot]:
signal.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
# Test fully left
left_top.put(1)
right_top.put(0)
right_bot.put(0)
left_bot.put(1)
assert bpm.pos_x.get() == -1
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 0
assert bpm.intensity.get() == 2
# Test fully right
left_top.put(0)
right_top.put(1)
right_bot.put(1)
left_bot.put(0)
assert bpm.pos_x.get() == 1
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 0
# Test fully top
left_top.put(1)
right_top.put(1)
right_bot.put(0)
left_bot.put(0)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 1
assert bpm.diagonal.get() == 0
# Test fully bottom
left_top.put(0)
right_top.put(0)
right_bot.put(1)
left_bot.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == -1
assert bpm.diagonal.get() == 0
# Diagonal beam
left_top.put(1)
right_top.put(0)
right_bot.put(1)
left_bot.put(0)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == -1
left_top.put(0)
right_top.put(1)
right_bot.put(0)
left_bot.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 1