Compare commits
52 Commits
fix/fixes_
...
config_and
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ac23be094e | ||
|
|
10cc820b2c | ||
|
|
acc3fb0104 | ||
| 6a704c6dd0 | |||
| 2e014bd9ea | |||
|
|
006a451220 | ||
|
|
bdc996d3b2 | ||
|
|
2fac8bc1d7 | ||
|
|
bf045dadf1 | ||
|
|
be508cf300 | ||
|
|
f786e34a0e | ||
|
|
cceedc947a
|
||
|
|
80de9724d4
|
||
|
|
2ac02e0623
|
||
|
|
3c2a0aa484
|
||
|
|
27f4eca4ae
|
||
|
|
f2771bd4b6
|
||
|
|
546ebf8a58
|
||
|
d3f1d31bb8
|
|||
|
6d404cad12
|
|||
|
|
b67e1c012c | ||
|
|
cbbec12d9b | ||
|
|
8f4a9f025e | ||
|
|
1b9b983ab2 | ||
|
|
d7b442969a | ||
|
|
f92db3f169
|
||
|
|
55531c8a65
|
||
|
|
1d408818cc
|
||
|
|
ae2045dd10
|
||
|
|
fd4d455a5b
|
||
|
|
3411aaaeb4
|
||
|
|
d9fc3094b6 | ||
|
|
88df4781ec | ||
|
|
3b474c89c8 | ||
|
|
68cc13e1d3 | ||
|
|
700f3f9bb9 | ||
|
|
15a4d45f68 | ||
|
|
7c7f877d78 | ||
|
|
5d61d756c9 | ||
|
|
b37ae3ef57 | ||
|
|
76ed858e5c | ||
|
|
a0555def4d | ||
|
|
c1ad2fc4c3 | ||
|
|
9eee4ee1f7
|
||
|
c97b00cc8c
|
|||
|
d6a4fd37fc
|
|||
|
6d4c9d90fc
|
|||
| 87163cc3f1 | |||
| 7c17a3ae40 | |||
| 663d22fff4 | |||
| 3ca29dd0dd | |||
| 0ac37f538b |
21
.gitea/workflows/rtd_deploy.yml
Normal file
21
.gitea/workflows/rtd_deploy.yml
Normal file
@@ -0,0 +1,21 @@
|
||||
name: Read the Docs Deploy Trigger
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
trigger-rtd-webhook:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Trigger Read the Docs webhook
|
||||
env:
|
||||
RTD_TOKEN: ${{ secrets.RTD_TOKEN }}
|
||||
run: |
|
||||
curl --fail --show-error --silent \
|
||||
-X POST \
|
||||
-d "branches=${{ github.ref_name }}" \
|
||||
-d "token=${RTD_TOKEN}" \
|
||||
"https://readthedocs.org/api/v2/webhook/sls-csaxs/270162/"
|
||||
@@ -8,15 +8,14 @@ version: 2
|
||||
build:
|
||||
os: ubuntu-20.04
|
||||
tools:
|
||||
python: "3.10"
|
||||
python: "3.12"
|
||||
jobs:
|
||||
pre_install:
|
||||
- pip install .
|
||||
|
||||
|
||||
# Build documentation in the docs/ directory with Sphinx
|
||||
sphinx:
|
||||
configuration: docs/conf.py
|
||||
configuration: docs/conf.py
|
||||
|
||||
# If using Sphinx, optionally build your docs in additional formats such as PDF
|
||||
# formats:
|
||||
@@ -24,6 +23,5 @@ sphinx:
|
||||
|
||||
# Optionally declare the Python requirements required to build your docs
|
||||
python:
|
||||
install:
|
||||
- requirements: docs/requirements.txt
|
||||
|
||||
install:
|
||||
- requirements: docs/requirements.txt
|
||||
|
||||
BIN
csaxs_bec/bec_ipython_client/plugins/LamNI/LamNI.png
Normal file
BIN
csaxs_bec/bec_ipython_client/plugins/LamNI/LamNI.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 562 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 48 KiB |
@@ -0,0 +1,89 @@
|
||||
"""
|
||||
LamNI/webpage_generator.py
|
||||
===========================
|
||||
LamNI-specific webpage generator subclass.
|
||||
|
||||
Integration (inside the LamNI __init__ / startup):
|
||||
---------------------------------------------------
|
||||
from csaxs_bec.bec_ipython_client.plugins.LamNI.webpage_generator import (
|
||||
LamniWebpageGenerator,
|
||||
)
|
||||
self._webpage_gen = LamniWebpageGenerator(
|
||||
bec_client=client,
|
||||
output_dir="~/data/raw/webpage/",
|
||||
)
|
||||
self._webpage_gen.start()
|
||||
|
||||
Or use the factory (auto-selects by session name "lamni"):
|
||||
----------------------------------------------------------
|
||||
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
|
||||
make_webpage_generator,
|
||||
)
|
||||
self._webpage_gen = make_webpage_generator(bec, output_dir="~/data/raw/webpage/")
|
||||
self._webpage_gen.start()
|
||||
|
||||
Interactive helpers:
|
||||
--------------------
|
||||
lamni._webpage_gen.status()
|
||||
lamni._webpage_gen.verbosity = 2
|
||||
lamni._webpage_gen.stop()
|
||||
lamni._webpage_gen.start()
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
|
||||
WebpageGeneratorBase,
|
||||
_safe_get,
|
||||
_safe_float,
|
||||
_gvar,
|
||||
)
|
||||
|
||||
|
||||
class LamniWebpageGenerator(WebpageGeneratorBase):
|
||||
"""
|
||||
LamNI-specific webpage generator.
|
||||
Logo: LamNI.png from the same directory as this file.
|
||||
|
||||
Override _collect_setup_data() to add LamNI-specific temperatures,
|
||||
sample name, and measurement settings.
|
||||
"""
|
||||
|
||||
# TODO: fill in LamNI-specific device paths
|
||||
# label -> dotpath under device_manager.devices
|
||||
_TEMP_MAP = {
|
||||
# "Sample": "lamni_temphum.temperature_sample",
|
||||
# "OSA": "lamni_temphum.temperature_osa",
|
||||
}
|
||||
|
||||
def _logo_path(self):
|
||||
return Path(__file__).parent / "LamNI.png"
|
||||
|
||||
def _collect_setup_data(self) -> dict:
|
||||
# ── LamNI-specific data goes here ─────────────────────────────
|
||||
# Uncomment and adapt when device names are known:
|
||||
#
|
||||
# dm = self._bec.device_manager
|
||||
# sample_name = _safe_get(dm, "lamni_samples.sample_names.sample0") or "N/A"
|
||||
# temperatures = {
|
||||
# label: _safe_float(_safe_get(dm, path))
|
||||
# for label, path in self._TEMP_MAP.items()
|
||||
# }
|
||||
# settings = {
|
||||
# "Sample name": sample_name,
|
||||
# "FOV x / y": ...,
|
||||
# "Exposure time": _gvar(self._bec, "tomo_countingtime", ".3f", " s"),
|
||||
# "Angle step": _gvar(self._bec, "tomo_angle_stepsize", ".2f", "\u00b0"),
|
||||
# }
|
||||
# return {
|
||||
# "type": "lamni",
|
||||
# "sample_name": sample_name,
|
||||
# "temperatures": temperatures,
|
||||
# "settings": settings,
|
||||
# }
|
||||
|
||||
# Placeholder — returns minimal info until implemented
|
||||
return {
|
||||
"type": "lamni",
|
||||
# LamNI-specific data here
|
||||
}
|
||||
@@ -70,7 +70,7 @@ DLPCA200_AMPLIFIER_CONFIG: dict[str, dict] = {
|
||||
"rio_device": "galilrioesxbox",
|
||||
"description": "Beam Position Monitor 4 current amplifier",
|
||||
"channels": {
|
||||
"gain_lsb": 0, # Pin 10 -> Galil ch0
|
||||
"gain_lsb": rio_optics.analog_in.ch0, # Pin 10 -> Galil ch0
|
||||
"gain_mid": 1, # Pin 11 -> Galil ch1
|
||||
"gain_msb": 2, # Pin 12 -> Galil ch2
|
||||
"coupling": 3, # Pin 13 -> Galil ch3
|
||||
|
||||
BIN
csaxs_bec/bec_ipython_client/plugins/flomni/flOMNI.png
Normal file
BIN
csaxs_bec/bec_ipython_client/plugins/flomni/flOMNI.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 124 KiB |
@@ -21,6 +21,14 @@ from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import (
|
||||
TomoIDManager,
|
||||
)
|
||||
|
||||
# from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
|
||||
# FlomniWebpageGenerator,
|
||||
# VERBOSITY_SILENT, # 0 — no output
|
||||
# VERBOSITY_NORMAL, # 1 — startup / stop messages only (default)
|
||||
# VERBOSITY_VERBOSE, # 2 — one-line summary per cycle
|
||||
# VERBOSITY_DEBUG, # 3 — full JSON payload per cycle
|
||||
# )
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
@@ -28,6 +36,7 @@ if builtins.__dict__.get("bec") is not None:
|
||||
dev = builtins.__dict__.get("dev")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
|
||||
@@ -35,12 +44,15 @@ def umv(*args):
|
||||
class FlomniToolsError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class FlomniInitError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class FlomniError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
# class FlomniTools:
|
||||
# def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
|
||||
# if autoconfirm and default == "y":
|
||||
@@ -84,13 +96,16 @@ class FlomniInitStagesMixin:
|
||||
else:
|
||||
return
|
||||
|
||||
|
||||
sensor_voltage_target = dev.ftransy.user_parameter.get("sensor_voltage")
|
||||
sensor_voltage = float(dev.ftransy.controller.socket_put_and_receive("MG@AN[1]").strip())
|
||||
|
||||
if not np.isclose(sensor_voltage, sensor_voltage_target, 0.25):
|
||||
print(f"Sensor voltage of the gripper is {sensor_voltage}, while target from config is {sensor_voltage_target}")
|
||||
print("Verify that the value is acceptable and update config file. Reload config and start again.")
|
||||
print(
|
||||
f"Sensor voltage of the gripper is {sensor_voltage}, while target from config is {sensor_voltage_target}"
|
||||
)
|
||||
print(
|
||||
"Verify that the value is acceptable and update config file. Reload config and start again."
|
||||
)
|
||||
return
|
||||
|
||||
print("Starting to drive ftransy to +y limit")
|
||||
@@ -118,8 +133,9 @@ class FlomniInitStagesMixin:
|
||||
dev.feyex.limits = [-30, -1]
|
||||
print("done")
|
||||
|
||||
|
||||
if self.OMNYTools.yesno("Init of foptz. Can the stage move to the upstream limit without collision?"):
|
||||
if self.OMNYTools.yesno(
|
||||
"Init of foptz. Can the stage move to the upstream limit without collision?"
|
||||
):
|
||||
print("OK. continue.")
|
||||
else:
|
||||
return
|
||||
@@ -173,7 +189,9 @@ class FlomniInitStagesMixin:
|
||||
dev.fsamy.limits = [2, 3.1]
|
||||
print("done")
|
||||
|
||||
if self.OMNYTools.yesno("Init of tracking stages. Did you remove the outer laser flight tubes?"):
|
||||
if self.OMNYTools.yesno(
|
||||
"Init of tracking stages. Did you remove the outer laser flight tubes?"
|
||||
):
|
||||
print("OK. continue.")
|
||||
else:
|
||||
print("Stopping.")
|
||||
@@ -206,7 +224,9 @@ class FlomniInitStagesMixin:
|
||||
print("done")
|
||||
|
||||
print("Initializing UPR stage.")
|
||||
if self.OMNYTools.yesno("To ensure that the end switches work, please check that they are currently not pushed. Is everything okay?"):
|
||||
if self.OMNYTools.yesno(
|
||||
"To ensure that the end switches work, please check that they are currently not pushed. Is everything okay?"
|
||||
):
|
||||
print("OK. continue.")
|
||||
else:
|
||||
print("Stopping.")
|
||||
@@ -246,7 +266,9 @@ class FlomniInitStagesMixin:
|
||||
dev.fsamroy.limits = [-5, 365]
|
||||
print("done")
|
||||
|
||||
if self.OMNYTools.yesno("Init of foptx. Can the stage move to the positive limit without collision? Attention: tracker flight tube!"):
|
||||
if self.OMNYTools.yesno(
|
||||
"Init of foptx. Can the stage move to the positive limit without collision? Attention: tracker flight tube!"
|
||||
):
|
||||
print("OK. continue.")
|
||||
else:
|
||||
print("Stopping.")
|
||||
@@ -425,7 +447,7 @@ class FlomniSampleTransferMixin:
|
||||
def ftransfer_flomni_stage_in(self):
|
||||
time.sleep(1)
|
||||
sample_in_position = dev.flomni_samples.is_sample_slot_used(0)
|
||||
#bool(float(dev.flomni_samples.sample_placed.sample0.get()))
|
||||
# bool(float(dev.flomni_samples.sample_placed.sample0.get()))
|
||||
if not sample_in_position:
|
||||
raise FlomniError("There is no sample in the sample stage. Aborting.")
|
||||
self.reset_correction()
|
||||
@@ -440,7 +462,10 @@ class FlomniSampleTransferMixin:
|
||||
|
||||
print("Moving X-ray eye in.")
|
||||
|
||||
if self.OMNYTools.yesno("Please confirm that this is ok with the flight tube. This check is to be removed after commissioning", "n"):
|
||||
if self.OMNYTools.yesno(
|
||||
"Please confirm that this is ok with the flight tube. This check is to be removed after commissioning",
|
||||
"n",
|
||||
):
|
||||
print("OK. continue.")
|
||||
else:
|
||||
print("Stopping.")
|
||||
@@ -451,7 +476,6 @@ class FlomniSampleTransferMixin:
|
||||
self.foptics_out()
|
||||
self.xrayeye_update_frame()
|
||||
|
||||
|
||||
def laser_tracker_show_all(self):
|
||||
dev.rtx.controller.laser_tracker_show_all()
|
||||
|
||||
@@ -554,7 +578,6 @@ class FlomniSampleTransferMixin:
|
||||
|
||||
self.flomnigui_show_cameras()
|
||||
|
||||
|
||||
self.ftransfer_gripper_move(position)
|
||||
|
||||
self.ftransfer_controller_enable_mount_mode()
|
||||
@@ -594,7 +617,7 @@ class FlomniSampleTransferMixin:
|
||||
self.check_sensor_connected()
|
||||
|
||||
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
|
||||
#bool(float(dev.flomni_samples.sample_in_gripper.get()))
|
||||
# bool(float(dev.flomni_samples.sample_in_gripper.get()))
|
||||
if not sample_in_gripper:
|
||||
raise FlomniError("The gripper does not carry a sample.")
|
||||
|
||||
@@ -646,7 +669,7 @@ class FlomniSampleTransferMixin:
|
||||
|
||||
def ftransfer_sample_change(self, new_sample_position: int):
|
||||
self.check_tray_in()
|
||||
# sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
|
||||
# sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
|
||||
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
|
||||
if sample_in_gripper:
|
||||
raise FlomniError("There is already a sample in the gripper. Aborting.")
|
||||
@@ -654,28 +677,30 @@ class FlomniSampleTransferMixin:
|
||||
self.check_position_is_valid(new_sample_position)
|
||||
|
||||
if new_sample_position == 0:
|
||||
raise FlomniError("The new sample to place cannot be the sample in the sample stage. Aborting.")
|
||||
raise FlomniError(
|
||||
"The new sample to place cannot be the sample in the sample stage. Aborting."
|
||||
)
|
||||
|
||||
# sample_placed = getattr(
|
||||
# dev.flomni_samples.sample_placed, f"sample{new_sample_position}"
|
||||
# ).get()
|
||||
# sample_placed = getattr(
|
||||
# dev.flomni_samples.sample_placed, f"sample{new_sample_position}"
|
||||
# ).get()
|
||||
sample_placed = dev.flomni_samples.is_sample_slot_used(new_sample_position)
|
||||
if not sample_placed:
|
||||
raise FlomniError(
|
||||
f"There is currently no sample in position [{new_sample_position}]. Aborting."
|
||||
)
|
||||
|
||||
# sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get()
|
||||
# sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get()
|
||||
sample_in_sample_stage = dev.flomni_samples.is_sample_slot_used(0)
|
||||
if sample_in_sample_stage:
|
||||
# find a new home for the sample...
|
||||
empty_slots = []
|
||||
# for name, val in dev.flomni_samples.read().items():
|
||||
# if "flomni_samples_sample_placed_sample" not in name:
|
||||
# continue
|
||||
# if val.get("value") == 0:
|
||||
# empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1]))
|
||||
for j in range(1,20):
|
||||
# for name, val in dev.flomni_samples.read().items():
|
||||
# if "flomni_samples_sample_placed_sample" not in name:
|
||||
# continue
|
||||
# if val.get("value") == 0:
|
||||
# empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1]))
|
||||
for j in range(1, 20):
|
||||
if not dev.flomni_samples.is_sample_slot_used(j):
|
||||
empty_slots.append(j)
|
||||
if not empty_slots:
|
||||
@@ -761,7 +786,9 @@ class FlomniSampleTransferMixin:
|
||||
dev.ftransy.controller.socket_put_confirmed("confirm=1")
|
||||
else:
|
||||
print("Stopping.")
|
||||
exit
|
||||
raise FlomniError(
|
||||
"User abort sample transfer."
|
||||
)
|
||||
|
||||
def ftransfer_gripper_is_open(self) -> bool:
|
||||
status = bool(float(dev.ftransy.controller.socket_put_and_receive("MG @OUT[9]").strip()))
|
||||
@@ -769,7 +796,7 @@ class FlomniSampleTransferMixin:
|
||||
|
||||
def ftransfer_gripper_open(self):
|
||||
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
|
||||
#dev.flomni_samples.sample_in_gripper.get()
|
||||
# dev.flomni_samples.sample_in_gripper.get()
|
||||
if sample_in_gripper:
|
||||
raise FlomniError(
|
||||
"Cannot open gripper. There is still a sample in the gripper! Aborting."
|
||||
@@ -784,13 +811,17 @@ class FlomniSampleTransferMixin:
|
||||
def ftransfer_gripper_move(self, position: int):
|
||||
self.check_position_is_valid(position)
|
||||
|
||||
self._ftransfer_shiftx = -0.2
|
||||
#this is not used for sample stage position!
|
||||
self._ftransfer_shiftx = -0.15
|
||||
self._ftransfer_shiftz = -0.5
|
||||
|
||||
fsamx_pos = dev.fsamx.readback.get()
|
||||
if position == 0 and fsamx_pos > -160:
|
||||
|
||||
if self.OMNYTools.yesno("May the flomni stage be moved out for the sample change? Feedback will be disabled and alignment will be lost!", "y"):
|
||||
if self.OMNYTools.yesno(
|
||||
"May the flomni stage be moved out for the sample change? Feedback will be disabled and alignment will be lost!",
|
||||
"y",
|
||||
):
|
||||
print("OK. continue.")
|
||||
self.ftransfer_flomni_stage_out()
|
||||
else:
|
||||
@@ -801,7 +832,7 @@ class FlomniSampleTransferMixin:
|
||||
self.check_tray_in()
|
||||
|
||||
if position == 0:
|
||||
umv(dev.ftransx, 10.715 + 0.2, dev.ftransz, 3.5950)
|
||||
umv(dev.ftransx, 11, dev.ftransz, 3.5950)
|
||||
if position == 1:
|
||||
umv(
|
||||
dev.ftransx,
|
||||
@@ -946,8 +977,6 @@ class FlomniSampleTransferMixin:
|
||||
|
||||
class FlomniAlignmentMixin:
|
||||
import csaxs_bec
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
# Ensure this is a Path object, not a string
|
||||
csaxs_bec_basepath = Path(csaxs_bec.__file__)
|
||||
@@ -956,10 +985,13 @@ class FlomniAlignmentMixin:
|
||||
|
||||
# Build the absolute path correctly
|
||||
default_correction_file = (
|
||||
csaxs_bec_basepath.parent / 'bec_ipython_client' / 'plugins' / 'flomni' / default_correction_file_rel
|
||||
csaxs_bec_basepath.parent
|
||||
/ "bec_ipython_client"
|
||||
/ "plugins"
|
||||
/ "flomni"
|
||||
/ default_correction_file_rel
|
||||
).resolve()
|
||||
|
||||
|
||||
def reset_correction(self, use_default_correction=True):
|
||||
"""
|
||||
Reset the correction to the default values.
|
||||
@@ -1031,12 +1063,12 @@ class FlomniAlignmentMixin:
|
||||
else:
|
||||
params = dev.omny_xray_gui.fit_params_x.get()
|
||||
|
||||
#amplitude
|
||||
tomo_alignment_fit[0][0] = params['SineModel_0_amplitude']
|
||||
#phase
|
||||
tomo_alignment_fit[0][1] = params['SineModel_0_shift']
|
||||
#offset
|
||||
tomo_alignment_fit[0][2] = params['LinearModel_1_intercept']
|
||||
# amplitude
|
||||
tomo_alignment_fit[0][0] = params["SineModel_0_amplitude"]
|
||||
# phase
|
||||
tomo_alignment_fit[0][1] = params["SineModel_0_shift"]
|
||||
# offset
|
||||
tomo_alignment_fit[0][2] = params["LinearModel_1_intercept"]
|
||||
print("applying vertical default values from mirror calibration, not from fit!")
|
||||
tomo_alignment_fit[1][0] = 0
|
||||
tomo_alignment_fit[1][1] = 0
|
||||
@@ -1045,7 +1077,6 @@ class FlomniAlignmentMixin:
|
||||
tomo_alignment_fit[1][4] = 0
|
||||
print("New alignment parameters loaded based on Xray eye alignment GUI:")
|
||||
|
||||
|
||||
print(
|
||||
f"X Amplitude {tomo_alignment_fit[0][0]}, "
|
||||
f"X Phase {tomo_alignment_fit[0][1]}, "
|
||||
@@ -1186,13 +1217,83 @@ class FlomniAlignmentMixin:
|
||||
return additional_correction_shift
|
||||
|
||||
|
||||
class _ProgressProxy:
|
||||
"""Dict-like proxy that persists the flOMNI progress dict as a BEC global variable.
|
||||
|
||||
Every read (`proxy["key"]`) fetches the current dict from the global var store,
|
||||
and every write (`proxy["key"] = val`) fetches, updates, and saves it back.
|
||||
This makes the progress state visible to all BEC client sessions via
|
||||
``client.get_global_var("tomo_progress")``.
|
||||
"""
|
||||
|
||||
_GLOBAL_VAR_KEY = "tomo_progress"
|
||||
_DEFAULTS: dict = {
|
||||
"subtomo": 0,
|
||||
"subtomo_projection": 0,
|
||||
"subtomo_total_projections": 1,
|
||||
"projection": 0,
|
||||
"total_projections": 1,
|
||||
"angle": 0,
|
||||
"tomo_type": 0,
|
||||
"tomo_start_time": None,
|
||||
"estimated_remaining_time": None,
|
||||
"heartbeat": None,
|
||||
}
|
||||
|
||||
def __init__(self, client):
|
||||
self._client = client
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ------------------------------------------------------------------
|
||||
def _load(self) -> dict:
|
||||
val = self._client.get_global_var(self._GLOBAL_VAR_KEY)
|
||||
if val is None:
|
||||
return dict(self._DEFAULTS)
|
||||
return val
|
||||
|
||||
def _save(self, data: dict) -> None:
|
||||
self._client.set_global_var(self._GLOBAL_VAR_KEY, data)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Dict-like interface
|
||||
# ------------------------------------------------------------------
|
||||
def __getitem__(self, key):
|
||||
return self._load()[key]
|
||||
|
||||
def __setitem__(self, key, value) -> None:
|
||||
data = self._load()
|
||||
data[key] = value
|
||||
self._save(data)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{self.__class__.__name__}({self._load()!r})"
|
||||
|
||||
def get(self, key, default=None):
|
||||
return self._load().get(key, default)
|
||||
|
||||
def update(self, *args, **kwargs) -> None:
|
||||
"""Update multiple fields in a single round-trip."""
|
||||
data = self._load()
|
||||
data.update(*args, **kwargs)
|
||||
self._save(data)
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset all progress fields to their default values."""
|
||||
self._save(dict(self._DEFAULTS))
|
||||
|
||||
def as_dict(self) -> dict:
|
||||
"""Return a plain copy of the current progress state."""
|
||||
return self._load()
|
||||
|
||||
|
||||
class Flomni(
|
||||
FlomniInitStagesMixin,
|
||||
FlomniSampleTransferMixin,
|
||||
FlomniAlignmentMixin,
|
||||
FlomniOpticsMixin,
|
||||
cSAXSBeamlineChecks,
|
||||
flomniGuiTools
|
||||
flomniGuiTools,
|
||||
):
|
||||
def __init__(self, client):
|
||||
super().__init__()
|
||||
@@ -1208,14 +1309,20 @@ class Flomni(
|
||||
self.corr_angle_y = []
|
||||
self.corr_pos_y_2 = []
|
||||
self.corr_angle_y_2 = []
|
||||
self.progress = {}
|
||||
self.progress["subtomo"] = 0
|
||||
self.progress["subtomo_projection"] = 0
|
||||
self.progress["subtomo_total_projections"] = 1
|
||||
self.progress["projection"] = 0
|
||||
self.progress["total_projections"] = 1
|
||||
self.progress["angle"] = 0
|
||||
self.progress["tomo_type"] = 0
|
||||
self._progress_proxy = _ProgressProxy(self.client)
|
||||
self._progress_proxy.reset()
|
||||
from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_webpage_generator import (
|
||||
FlomniWebpageGenerator,
|
||||
)
|
||||
self._webpage_gen = FlomniWebpageGenerator(
|
||||
bec_client=client,
|
||||
output_dir="~/data/raw/webpage/",
|
||||
#upload_url="http://s1090968537.online.de/upload.php", # optional
|
||||
upload_url="https://v1p0zyg2w9n2k9c1.myfritz.net/upload.php",
|
||||
local_port=8080
|
||||
)
|
||||
self._webpage_gen.start()
|
||||
|
||||
self.OMNYTools = OMNYTools(self.client)
|
||||
self.reconstructor = PtychoReconstructor(self.ptycho_reconstruct_foldername)
|
||||
self.tomo_id_manager = TomoIDManager()
|
||||
@@ -1224,7 +1331,10 @@ class Flomni(
|
||||
|
||||
def start_x_ray_eye_alignment(self, keep_shutter_open=False):
|
||||
|
||||
if self.OMNYTools.yesno("Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.", "y"):
|
||||
if self.OMNYTools.yesno(
|
||||
"Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.",
|
||||
"y",
|
||||
):
|
||||
self.align = XrayEyeAlign(self.client, self)
|
||||
try:
|
||||
self.align.align(keep_shutter_open)
|
||||
@@ -1236,7 +1346,7 @@ class Flomni(
|
||||
umv(dev.fsamx, fsamx_in)
|
||||
raise exc
|
||||
|
||||
def xrayeye_update_frame(self,keep_shutter_open=False):
|
||||
def xrayeye_update_frame(self, keep_shutter_open=False):
|
||||
self.align.update_frame(keep_shutter_open)
|
||||
|
||||
def xrayeye_alignment_start(self, keep_shutter_open=False):
|
||||
@@ -1268,6 +1378,42 @@ class Flomni(
|
||||
self.special_angles = []
|
||||
self.special_angle_repeats = 1
|
||||
|
||||
@property
|
||||
def progress(self) -> _ProgressProxy:
|
||||
"""Proxy dict backed by the BEC global variable ``tomo_progress``.
|
||||
|
||||
Readable from any BEC client session via::
|
||||
|
||||
client.get_global_var("tomo_progress")
|
||||
|
||||
Individual fields can be read and written just like a regular dict::
|
||||
|
||||
flomni.progress["projection"] # read
|
||||
flomni.progress["projection"] = 42 # write (persists immediately)
|
||||
|
||||
To update multiple fields atomically use :py:meth:`_ProgressProxy.update`::
|
||||
|
||||
flomni.progress.update(projection=42, angle=90.0)
|
||||
|
||||
To reset all fields to their defaults::
|
||||
|
||||
flomni.progress.reset()
|
||||
"""
|
||||
return self._progress_proxy
|
||||
|
||||
@progress.setter
|
||||
def progress(self, val: dict) -> None:
|
||||
"""Replace the entire progress dict.
|
||||
|
||||
Accepts a plain :class:`dict` and persists it to the global var store.
|
||||
Example::
|
||||
|
||||
flomni.progress = {"projection": 0, "total_projections": 100, ...}
|
||||
"""
|
||||
if not isinstance(val, dict):
|
||||
raise TypeError(f"progress must be a dict, got {type(val).__name__!r}")
|
||||
self._progress_proxy._save(val)
|
||||
|
||||
@property
|
||||
def tomo_shellstep(self):
|
||||
val = self.client.get_global_var("tomo_shellstep")
|
||||
@@ -1454,21 +1600,11 @@ class Flomni(
|
||||
def sample_name(self):
|
||||
return self.sample_get_name(0)
|
||||
|
||||
def write_to_scilog(self, content, tags: list = None):
|
||||
try:
|
||||
if tags is not None:
|
||||
tags.append("BEC")
|
||||
else:
|
||||
tags = ["BEC"]
|
||||
msg = bec.logbook.LogbookMessage()
|
||||
msg.add_text(content).add_tag(tags)
|
||||
self.client.logbook.send_logbook_message(msg)
|
||||
except Exception:
|
||||
logger.warning("Failed to write to scilog.")
|
||||
|
||||
def tomo_alignment_scan(self):
|
||||
"""
|
||||
Performs a tomogram alignment scan.
|
||||
Collects all scan numbers acquired during the alignment, prints them at the end,
|
||||
and creates a BEC scilog text entry summarising the alignment scan numbers.
|
||||
"""
|
||||
if self.get_alignment_offset(0) == (0, 0, 0):
|
||||
print("It appears that the xrayeye alignemtn was not performend or loaded. Aborting.")
|
||||
@@ -1476,14 +1612,11 @@ class Flomni(
|
||||
dev = builtins.__dict__.get("dev")
|
||||
bec = builtins.__dict__.get("bec")
|
||||
|
||||
|
||||
self.feye_out()
|
||||
tags = ["BEC_alignment_tomo", self.sample_name]
|
||||
self.write_to_scilog(
|
||||
f"Starting alignment scan. First scan number: {bec.queue.next_scan_number}.", tags
|
||||
)
|
||||
|
||||
|
||||
start_angle = 0
|
||||
alignment_scan_numbers = []
|
||||
|
||||
angle_end = start_angle + 180
|
||||
for angle in np.linspace(start_angle, angle_end, num=int(180 / 45) + 1, endpoint=True):
|
||||
@@ -1495,7 +1628,6 @@ class Flomni(
|
||||
try:
|
||||
start_scan_number = bec.queue.next_scan_number
|
||||
self.tomo_scan_projection(angle)
|
||||
self.tomo_reconstruct()
|
||||
error_caught = False
|
||||
except AlarmBase as exc:
|
||||
if exc.alarm_type == "TimeoutError":
|
||||
@@ -1504,28 +1636,33 @@ class Flomni(
|
||||
error_caught = True
|
||||
else:
|
||||
raise exc
|
||||
#todo here was if blchk success, then setting to success true
|
||||
# todo here was if blchk success, then setting to success true
|
||||
successful = True
|
||||
|
||||
|
||||
end_scan_number = bec.queue.next_scan_number
|
||||
for scan_nr in range(start_scan_number, end_scan_number):
|
||||
self._write_tomo_scan_number(scan_nr, angle, 0)
|
||||
#self._write_tomo_scan_number(scan_nr, angle, 0)
|
||||
alignment_scan_numbers.append(scan_nr)
|
||||
|
||||
umv(dev.fsamroy, 0)
|
||||
self.OMNYTools.printgreenbold("\n\nAlignment scan finished. Please run SPEC_ptycho_align and load the new fit.")
|
||||
|
||||
def _write_subtomo_to_scilog(self, subtomo_number):
|
||||
dev = builtins.__dict__.get("dev")
|
||||
bec = builtins.__dict__.get("bec")
|
||||
if self.tomo_id > 0:
|
||||
tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"]
|
||||
else:
|
||||
tags = ["BEC_subtomo", self.sample_name]
|
||||
self.write_to_scilog(
|
||||
f"Starting subtomo: {subtomo_number}. First scan number: {bec.queue.next_scan_number}.",
|
||||
tags,
|
||||
self.OMNYTools.printgreenbold(
|
||||
"\n\nAlignment scan finished. Please run SPEC_ptycho_align and load the new fit by flomni.read_alignment_offset() ."
|
||||
)
|
||||
|
||||
# summary of alignment scan numbers
|
||||
scan_list_str = ", ".join(str(s) for s in alignment_scan_numbers)
|
||||
#print(f"\nAlignment scan numbers ({len(alignment_scan_numbers)} total): {scan_list_str}")
|
||||
|
||||
# BEC scilog entry (no logo)
|
||||
scilog_content = (
|
||||
f"Alignment scan finished.\n"
|
||||
f"Sample: {self.sample_name}\n"
|
||||
f"Number of alignment scans: {len(alignment_scan_numbers)}\n"
|
||||
f"Alignment scan numbers: {scan_list_str}\n"
|
||||
)
|
||||
print(scliog_content)
|
||||
bec.messaging.scilog.new().add_text(scilog_content.replace("\n", "<br>")).add_tags("alignmentscan").send()
|
||||
|
||||
def sub_tomo_scan(self, subtomo_number, start_angle=None):
|
||||
"""
|
||||
Performs a sub tomogram scan.
|
||||
@@ -1533,18 +1670,6 @@ class Flomni(
|
||||
subtomo_number (int): The sub tomogram number.
|
||||
start_angle (float, optional): The start angle of the scan. Defaults to None.
|
||||
"""
|
||||
# dev = builtins.__dict__.get("dev")
|
||||
# bec = builtins.__dict__.get("bec")
|
||||
# if self.tomo_id > 0:
|
||||
# tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"]
|
||||
# else:
|
||||
# tags = ["BEC_subtomo", self.sample_name]
|
||||
# self.write_to_scilog(
|
||||
# f"Starting subtomo: {subtomo_number}. First scan number: {bec.queue.next_scan_number}.",
|
||||
# tags,
|
||||
# )
|
||||
|
||||
self._write_subtomo_to_scilog(subtomo_number)
|
||||
|
||||
if start_angle is not None:
|
||||
print(f"Sub tomo scan with start angle {start_angle} requested.")
|
||||
@@ -1567,20 +1692,19 @@ class Flomni(
|
||||
elif subtomo_number == 8:
|
||||
start_angle = self.tomo_angle_stepsize / 8.0 * 7
|
||||
|
||||
|
||||
# _tomo_shift_angles (potential global variable)
|
||||
_tomo_shift_angles = 0
|
||||
# compute number of projections
|
||||
|
||||
start = start_angle + _tomo_shift_angles
|
||||
|
||||
if subtomo_number % 2: # odd = forward
|
||||
if subtomo_number % 2: # odd = forward
|
||||
max_allowed_angle = 180.05 + self.tomo_angle_stepsize
|
||||
proposed_end = start + 180
|
||||
angle_end = min(proposed_end, max_allowed_angle)
|
||||
span = angle_end - start
|
||||
|
||||
else: # even = reverse
|
||||
else: # even = reverse
|
||||
min_allowed_angle = 0
|
||||
proposed_end = start - 180
|
||||
angle_end = max(proposed_end, min_allowed_angle)
|
||||
@@ -1589,38 +1713,23 @@ class Flomni(
|
||||
# number of projections needed to maintain step size
|
||||
N = int(span / self.tomo_angle_stepsize) + 1
|
||||
|
||||
angles = np.linspace(
|
||||
start,
|
||||
angle_end,
|
||||
num=N,
|
||||
endpoint=True,
|
||||
)
|
||||
angles = np.linspace(start, angle_end, num=N, endpoint=True)
|
||||
|
||||
if subtomo_number % 2: # odd subtomos → forward direction
|
||||
if subtomo_number % 2: # odd subtomos → forward direction
|
||||
# clamp end angle to max allowed
|
||||
max_allowed_angle = 180.05 + self.tomo_angle_stepsize
|
||||
proposed_end = start + 180
|
||||
angle_end = min(proposed_end, max_allowed_angle)
|
||||
|
||||
angles = np.linspace(
|
||||
start,
|
||||
angle_end,
|
||||
num=N,
|
||||
endpoint=True,
|
||||
)
|
||||
angles = np.linspace(start, angle_end, num=N, endpoint=True)
|
||||
|
||||
else: # even subtomos → reverse direction
|
||||
else: # even subtomos → reverse direction
|
||||
# go FROM start_angle down toward 0
|
||||
min_allowed_angle = 0
|
||||
proposed_end = start - 180
|
||||
angle_end = max(proposed_end, min_allowed_angle)
|
||||
|
||||
angles = np.linspace(
|
||||
start,
|
||||
angle_end,
|
||||
num=N,
|
||||
endpoint=True,
|
||||
)
|
||||
angles = np.linspace(start, angle_end, num=N, endpoint=True)
|
||||
|
||||
for i, angle in enumerate(angles):
|
||||
|
||||
@@ -1636,9 +1745,9 @@ class Flomni(
|
||||
self._subtomo_offset = 0
|
||||
|
||||
else:
|
||||
if subtomo_number % 2: # odd = forward direction
|
||||
if subtomo_number % 2: # odd = forward direction
|
||||
self._subtomo_offset = round(sa / step)
|
||||
else: # even = reverse direction
|
||||
else: # even = reverse direction
|
||||
self._subtomo_offset = round((180 - sa) / step)
|
||||
|
||||
# progress index must always increase
|
||||
@@ -1647,22 +1756,24 @@ class Flomni(
|
||||
|
||||
# existing progress fields
|
||||
self.progress["subtomo_total_projections"] = int(180 / self.tomo_angle_stepsize)
|
||||
self.progress["projection"] = (subtomo_number - 1) * self.progress["subtomo_total_projections"] + self.progress["subtomo_projection"]
|
||||
self.progress["projection"] = (subtomo_number - 1) * self.progress[
|
||||
"subtomo_total_projections"
|
||||
] + self.progress["subtomo_projection"]
|
||||
self.progress["total_projections"] = 180 / self.tomo_angle_stepsize * 8
|
||||
self.progress["angle"] = angle
|
||||
|
||||
# finally do the scan at this angle
|
||||
self._tomo_scan_at_angle(angle, subtomo_number)
|
||||
|
||||
|
||||
def _tomo_scan_at_angle(self, angle, subtomo_number):
|
||||
successful = False
|
||||
error_caught = False
|
||||
if 0 <= angle < 180.05:
|
||||
self.progress["heartbeat"] = datetime.datetime.now().isoformat()
|
||||
print(f"Starting flOMNI scan for angle {angle} in subtomo {subtomo_number}")
|
||||
self._print_progress()
|
||||
while not successful:
|
||||
#self.bl_chk._bl_chk_start()
|
||||
# self.bl_chk._bl_chk_start()
|
||||
if not self.special_angles:
|
||||
self._current_special_angles = []
|
||||
if self._current_special_angles:
|
||||
@@ -1697,12 +1808,14 @@ class Flomni(
|
||||
"""start a tomo scan"""
|
||||
|
||||
if not self._check_eye_out_and_optics_in():
|
||||
print("Attention: The setup is not in measurement condition.\nXray eye might be IN or the Xray optics OUT.")
|
||||
print(
|
||||
"Attention: The setup is not in measurement condition.\nXray eye might be IN or the Xray optics OUT."
|
||||
)
|
||||
if self.OMNYTools.yesno("Shall I continue?", "n"):
|
||||
print("OK")
|
||||
else:
|
||||
print("Stopping.")
|
||||
return
|
||||
print("OK")
|
||||
else:
|
||||
print("Stopping.")
|
||||
return
|
||||
|
||||
self.flomnigui_show_progress()
|
||||
|
||||
@@ -1716,20 +1829,22 @@ class Flomni(
|
||||
or (self.tomo_type == 3 and projection_number == None)
|
||||
):
|
||||
|
||||
# pylint: disable=undefined-variable
|
||||
# if bec.active_account != "":
|
||||
# self.tomo_id = self.add_sample_database(
|
||||
# self.sample_name,
|
||||
# str(datetime.date.today()),
|
||||
# bec.active_account,
|
||||
# bec.queue.next_scan_number,
|
||||
# "flomni",
|
||||
# "test additional info",
|
||||
# "BEC",
|
||||
# )
|
||||
# self.write_pdf_report()
|
||||
# else:
|
||||
self.tomo_id = 0
|
||||
#pylint: disable=undefined-variable
|
||||
if bec.active_account != "":
|
||||
self.tomo_id = self.add_sample_database(
|
||||
self.sample_name,
|
||||
str(datetime.date.today()),
|
||||
bec.active_account,
|
||||
bec.queue.next_scan_number,
|
||||
"flomni",
|
||||
"test additional info",
|
||||
"BEC",
|
||||
)
|
||||
self.write_pdf_report()
|
||||
else:
|
||||
self.tomo_id = 0
|
||||
self.write_pdf_report()
|
||||
self.progress["tomo_start_time"] = datetime.datetime.now().isoformat()
|
||||
|
||||
with scans.dataset_id_on_hold:
|
||||
if self.tomo_type == 1:
|
||||
@@ -1738,7 +1853,7 @@ class Flomni(
|
||||
for ii in range(subtomo_start, 9):
|
||||
self.sub_tomo_scan(ii, start_angle=start_angle)
|
||||
start_angle = None
|
||||
|
||||
|
||||
elif self.tomo_type == 2:
|
||||
# Golden ratio tomography
|
||||
previous_subtomo_number = -1
|
||||
@@ -1749,7 +1864,6 @@ class Flomni(
|
||||
while True:
|
||||
angle, subtomo_number = self._golden(ii, self.golden_ratio_bunch_size, 180, 1)
|
||||
if previous_subtomo_number != subtomo_number:
|
||||
self._write_subtomo_to_scilog(subtomo_number)
|
||||
if (
|
||||
subtomo_number % 2 == 1
|
||||
and ii > 10
|
||||
@@ -1797,7 +1911,6 @@ class Flomni(
|
||||
ii, int(180 / self.tomo_angle_stepsize), 180, 1, 0
|
||||
)
|
||||
if previous_subtomo_number != subtomo_number:
|
||||
self._write_subtomo_to_scilog(subtomo_number)
|
||||
if (
|
||||
subtomo_number % 2 == 1
|
||||
and ii > 10
|
||||
@@ -1834,19 +1947,47 @@ class Flomni(
|
||||
else:
|
||||
raise FlomniError("undefined tomo type")
|
||||
|
||||
self.progress['projection'] = self.progress['total_projections']
|
||||
self.progress["projection"] = self.progress["total_projections"]
|
||||
self.progress["subtomo_projection"] = self.progress["subtomo_total_projections"]
|
||||
self._print_progress()
|
||||
self._print_progress()
|
||||
self.OMNYTools.printgreenbold("Tomoscan finished")
|
||||
|
||||
@staticmethod
|
||||
def _format_duration(seconds: float) -> str:
|
||||
"""Format a duration in seconds as a human-readable string, e.g. '2h 03m 15s'."""
|
||||
seconds = int(seconds)
|
||||
h, remainder = divmod(seconds, 3600)
|
||||
m, s = divmod(remainder, 60)
|
||||
if h > 0:
|
||||
return f"{h}h {m:02d}m {s:02d}s"
|
||||
if m > 0:
|
||||
return f"{m}m {s:02d}s"
|
||||
return f"{s}s"
|
||||
|
||||
def _print_progress(self):
|
||||
# --- compute and store estimated remaining time -----------------------
|
||||
start_str = self.progress.get("tomo_start_time")
|
||||
projection = self.progress["projection"]
|
||||
total = self.progress["total_projections"]
|
||||
if start_str is not None and total > 0 and projection > 9:
|
||||
elapsed = (
|
||||
datetime.datetime.now() - datetime.datetime.fromisoformat(start_str)
|
||||
).total_seconds()
|
||||
rate = projection / elapsed # projections per second
|
||||
remaining_s = (total - projection) / rate
|
||||
self.progress["estimated_remaining_time"] = remaining_s
|
||||
eta_str = self._format_duration(remaining_s)
|
||||
else:
|
||||
eta_str = "N/A"
|
||||
# ----------------------------------------------------------------------
|
||||
print("\x1b[95mProgress report:")
|
||||
print(f"Tomo type: ....................... {self.progress['tomo_type']}")
|
||||
print(f"Projection: ...................... {self.progress['projection']:.0f}")
|
||||
print(f"Total projections expected ....... {self.progress['total_projections']}")
|
||||
print(f"Angle: ........................... {self.progress['angle']}")
|
||||
print(f"Current subtomo: ................. {self.progress['subtomo']}")
|
||||
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}\x1b[0m")
|
||||
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}")
|
||||
print(f"Estimated remaining time: ........ {eta_str}\x1b[0m")
|
||||
self._flomnigui_update_progress()
|
||||
|
||||
def add_sample_database(
|
||||
@@ -1870,7 +2011,6 @@ class Flomni(
|
||||
return
|
||||
|
||||
self.tomo_scan_projection(angle)
|
||||
self.tomo_reconstruct()
|
||||
|
||||
def _golden(self, ii, howmany_sorted, maxangle, reverse=False):
|
||||
"""returns the iis golden ratio angle of sorted bunches of howmany_sorted and its subtomo number"""
|
||||
@@ -1930,9 +2070,7 @@ class Flomni(
|
||||
)
|
||||
|
||||
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
|
||||
tomo_scan_numbers_file = os.path.expanduser(
|
||||
"~/data/raw/logs/tomography_scannumbers.txt"
|
||||
)
|
||||
tomo_scan_numbers_file = os.path.expanduser("~/data/raw/logs/tomography_scannumbers.txt")
|
||||
with open(tomo_scan_numbers_file, "a+") as out_file:
|
||||
# pylint: disable=undefined-variable
|
||||
out_file.write(
|
||||
@@ -1943,7 +2081,6 @@ class Flomni(
|
||||
|
||||
dev.rtx.controller.laser_tracker_check_signalstrength()
|
||||
|
||||
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
# additional_correction = self.align.compute_additional_correction(angle)
|
||||
@@ -1978,7 +2115,7 @@ class Flomni(
|
||||
f"{str(datetime.datetime.now())}: flomni scan projection at angle {angle}, scan"
|
||||
f" number {bec.queue.next_scan_number}.\n"
|
||||
)
|
||||
# self.write_to_scilog(log_message, ["BEC_scans", self.sample_name])
|
||||
|
||||
scans.flomni_fermat_scan(
|
||||
fovx=self.fovx,
|
||||
fovy=self.fovy,
|
||||
@@ -1991,6 +2128,9 @@ class Flomni(
|
||||
corridor_size=corridor_size,
|
||||
)
|
||||
|
||||
self.tomo_reconstruct()
|
||||
|
||||
|
||||
def tomo_parameters(self):
|
||||
"""print and update the tomo parameters"""
|
||||
print("Current settings:")
|
||||
@@ -2033,7 +2173,6 @@ class Flomni(
|
||||
)
|
||||
print(f"\nSample name: {self.sample_name}\n")
|
||||
|
||||
|
||||
if self.OMNYTools.yesno("Are these parameters correctly set for your scan?", "y"):
|
||||
print("... excellent!")
|
||||
else:
|
||||
@@ -2130,19 +2269,21 @@ class Flomni(
|
||||
+ ' 888 888 "Y88888P" 888 888 888 Y888 8888888 \n'
|
||||
)
|
||||
padding = 20
|
||||
fovxy = f"{self.fovx:.2f}/{self.fovy:.2f}"
|
||||
stitching = f"{self.stitch_x:.2f}/{self.stitch_y:.2f}"
|
||||
fovxy = f"{self.fovx:.1f}/{self.fovy:.1f}"
|
||||
stitching = f"{self.stitch_x:.0f}/{self.stitch_y:.0f}"
|
||||
dataset_id = str(self.client.queue.next_dataset_number)
|
||||
account = bec.active_account
|
||||
content = [
|
||||
f"{'Sample Name:':<{padding}}{self.sample_name:>{padding}}\n",
|
||||
f"{'Measurement ID:':<{padding}}{str(self.tomo_id):>{padding}}\n",
|
||||
f"{'Dataset ID:':<{padding}}{dataset_id:>{padding}}\n",
|
||||
f"{'Sample Info:':<{padding}}{'Sample Info':>{padding}}\n",
|
||||
f"{'e-account:':<{padding}}{str(self.client.username):>{padding}}\n",
|
||||
f"{'e-account:':<{padding}}{str(account):>{padding}}\n",
|
||||
f"{'Number of projections:':<{padding}}{int(180 / self.tomo_angle_stepsize * 8):>{padding}}\n",
|
||||
f"{'First scan number:':<{padding}}{self.client.queue.next_scan_number:>{padding}}\n",
|
||||
f"{'Last scan number approx.:':<{padding}}{self.client.queue.next_scan_number + int(180 / self.tomo_angle_stepsize * 8) + 10:>{padding}}\n",
|
||||
f"{'Current photon energy:':<{padding}}{dev.mokev.read()['mokev']['value']:>{padding}.4f}\n",
|
||||
f"{'Current photon energy:':<{padding}}To be implemented\n",
|
||||
#f"{'Current photon energy:':<{padding}}{dev.mokev.read()['mokev']['value']:>{padding}.4f}\n",
|
||||
f"{'Exposure time:':<{padding}}{self.tomo_countingtime:>{padding}.2f}\n",
|
||||
f"{'Fermat spiral step size:':<{padding}}{self.tomo_shellstep:>{padding}.2f}\n",
|
||||
f"{'FOV:':<{padding}}{fovxy:>{padding}}\n",
|
||||
@@ -2151,20 +2292,38 @@ class Flomni(
|
||||
f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n",
|
||||
]
|
||||
content = "".join(content)
|
||||
user_target = os.path.expanduser(f"~/Data10/documentation/tomo_scan_ID_{self.tomo_id}.pdf")
|
||||
user_target = os.path.expanduser(f"~/data/raw/documentation/tomo_scan_ID_{self.tomo_id}.pdf")
|
||||
with PDFWriter(user_target) as file:
|
||||
file.write(header)
|
||||
file.write(content)
|
||||
subprocess.run(
|
||||
"xterm /work/sls/spec/local/XOMNY/bin/upload/upload_last_pon.sh &", shell=True
|
||||
)
|
||||
# subprocess.run(
|
||||
# "xterm /work/sls/spec/local/XOMNY/bin/upload/upload_last_pon.sh &", shell=True
|
||||
# )
|
||||
# status = subprocess.run(f"cp /tmp/spec-e20131-specES1.pdf {user_target}", shell=True)
|
||||
msg = bec.logbook.LogbookMessage()
|
||||
logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LamNI_logo.png")
|
||||
msg.add_file(logo_path).add_text("".join(content).replace("\n", "</p><p>")).add_tag(
|
||||
["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "LamNI", self.sample_name]
|
||||
)
|
||||
self.client.logbook.send_logbook_message(msg)
|
||||
# msg = bec.tomo_progress.tomo_progressMessage()
|
||||
# logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LamNI_logo.png")
|
||||
# msg.add_file(logo_path).add_text("".join(content).replace("\n", "</p><p>")).add_tag(
|
||||
# ["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "flOMNI", self.sample_name]
|
||||
# )
|
||||
# self.client.tomo_progress.send_tomo_progress_message("~/data/raw/documentation/tomo_scan_ID_{self.tomo_id}.pdf").send()
|
||||
import csaxs_bec
|
||||
|
||||
|
||||
# Ensure this is a Path object, not a string
|
||||
csaxs_bec_basepath = Path(csaxs_bec.__file__)
|
||||
|
||||
logo_file_rel = "flOMNI.png"
|
||||
|
||||
# Build the absolute path correctly
|
||||
logo_file = (
|
||||
csaxs_bec_basepath.parent
|
||||
/ "bec_ipython_client"
|
||||
/ "plugins"
|
||||
/ "flomni"
|
||||
/ logo_file_rel
|
||||
).resolve()
|
||||
print(logo_file)
|
||||
bec.messaging.scilog.new().add_attachment(logo_file, width=200).add_text(content.replace("\n", "<br>")).add_tags("tomoscan").send()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
@@ -2181,4 +2340,4 @@ if __name__ == "__main__":
|
||||
builtins.__dict__["bec"] = bec
|
||||
builtins.__dict__["umv"] = umv
|
||||
flomni = Flomni(bec)
|
||||
flomni.start_x_ray_eye_alignment()
|
||||
flomni.start_x_ray_eye_alignment()
|
||||
|
||||
@@ -50,8 +50,6 @@ class FlomniOpticsMixin:
|
||||
# move both axes to the desired "in" positions
|
||||
umv(dev.feyex, feyex_in, dev.feyey, feyey_in)
|
||||
|
||||
self.xrayeye_update_frame()
|
||||
|
||||
def _ffzp_in(self):
|
||||
foptx_in = self._get_user_param_safe("foptx", "in")
|
||||
fopty_in = self._get_user_param_safe("fopty", "in")
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -18,10 +18,17 @@ class flomniGuiToolsError(Exception):
|
||||
|
||||
|
||||
class flomniGuiTools:
|
||||
GUI_RPC_TIMEOUT = 20
|
||||
|
||||
def __init__(self):
|
||||
self.text_box = None
|
||||
self.progressbar = None
|
||||
self.flomni_window = None
|
||||
self.xeyegui = None
|
||||
self.pdf_viewer = None
|
||||
self.idle_text_box = None
|
||||
self.camera_gripper_image = None
|
||||
self.camera_overview_image = None
|
||||
|
||||
def set_client(self, client):
|
||||
self.client = client
|
||||
@@ -29,9 +36,10 @@ class flomniGuiTools:
|
||||
|
||||
def flomnigui_show_gui(self):
|
||||
if "flomni" in self.gui.windows:
|
||||
self.flomni_window = self.gui.windows["flomni"]
|
||||
self.gui.flomni.raise_window()
|
||||
else:
|
||||
self.gui.new("flomni")
|
||||
self.flomni_window = self.gui.new("flomni", timeout=self.GUI_RPC_TIMEOUT)
|
||||
time.sleep(1)
|
||||
|
||||
def flomnigui_stop_gui(self):
|
||||
@@ -42,9 +50,11 @@ class flomniGuiTools:
|
||||
|
||||
def flomnigui_show_xeyealign(self):
|
||||
self.flomnigui_show_gui()
|
||||
if self._flomnigui_check_attribute_not_exists("xeyegui"):
|
||||
if self._flomnigui_is_missing("xeyegui"):
|
||||
self.flomnigui_remove_all_docks()
|
||||
self.xeyegui = self.gui.flomni.new("XRayEye", object_name="xrayeye")
|
||||
self.xeyegui = self.gui.flomni.new(
|
||||
"XRayEye", object_name="xrayeye", timeout=self.GUI_RPC_TIMEOUT
|
||||
)
|
||||
# start live
|
||||
if not dev.cam_xeye.live_mode_enabled.get():
|
||||
dev.cam_xeye.live_mode_enabled.put(True)
|
||||
@@ -52,9 +62,11 @@ class flomniGuiTools:
|
||||
|
||||
def flomnigui_show_xeyealign_fittab(self):
|
||||
self.flomnigui_show_gui()
|
||||
if self._flomnigui_check_attribute_not_exists("xeyegui"):
|
||||
if self._flomnigui_is_missing("xeyegui"):
|
||||
self.flomnigui_remove_all_docks()
|
||||
self.xeyegui = self.gui.flomni.new("XRayEye")
|
||||
self.xeyegui = self.gui.flomni.new(
|
||||
"XRayEye", object_name="xrayeye", timeout=self.GUI_RPC_TIMEOUT
|
||||
)
|
||||
self.xeyegui.switch_tab("fit")
|
||||
|
||||
def _flomnigui_check_attribute_not_exists(self, attribute_name):
|
||||
@@ -70,31 +82,39 @@ class flomniGuiTools:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _flomnigui_is_missing(self, attribute_name):
|
||||
widget = getattr(self, attribute_name, None)
|
||||
if widget is None:
|
||||
return True
|
||||
if hasattr(widget, "_is_deleted") and widget._is_deleted():
|
||||
return True
|
||||
return False
|
||||
|
||||
def flomnigui_show_cameras(self):
|
||||
self.flomnigui_show_gui()
|
||||
if self._flomnigui_check_attribute_not_exists(
|
||||
"cam_flomni_gripper"
|
||||
) or self._flomnigui_check_attribute_not_exists("cam_flomni_overview"):
|
||||
self.flomnigui_remove_all_docks()
|
||||
camera_gripper_image = self.gui.flomni.new("Image")
|
||||
self.camera_gripper_image = self.gui.flomni.new("Image")
|
||||
if self._flomnicam_check_device_exists(dev.cam_flomni_gripper):
|
||||
camera_gripper_image.image(device="cam_flomni_gripper", signal="preview")
|
||||
camera_gripper_image.lock_aspect_ratio = True
|
||||
camera_gripper_image.enable_fps_monitor = True
|
||||
camera_gripper_image.enable_toolbar = False
|
||||
camera_gripper_image.outer_axes = False
|
||||
camera_gripper_image.inner_axes = False
|
||||
self.camera_gripper_image.image(device="cam_flomni_gripper", signal="preview")
|
||||
self.camera_gripper_image.lock_aspect_ratio = True
|
||||
self.camera_gripper_image.enable_fps_monitor = True
|
||||
self.camera_gripper_image.enable_toolbar = False
|
||||
self.camera_gripper_image.outer_axes = False
|
||||
self.camera_gripper_image.inner_axes = False
|
||||
dev.cam_flomni_gripper.start_live_mode()
|
||||
else:
|
||||
print("Cannot open camera_gripper. Device does not exist.")
|
||||
camera_overview_image = self.gui.flomni.new("Image")
|
||||
self.camera_overview_image = self.gui.flomni.new("Image")
|
||||
if self._flomnicam_check_device_exists(dev.cam_flomni_overview):
|
||||
camera_overview_image.image(device="cam_flomni_overview", signal="preview")
|
||||
camera_overview_image.lock_aspect_ratio = True
|
||||
camera_overview_image.enable_fps_monitor = True
|
||||
camera_overview_image.enable_toolbar = False
|
||||
camera_overview_image.outer_axes = False
|
||||
camera_overview_image.inner_axes = False
|
||||
self.camera_overview_image.image(device="cam_flomni_overview", signal="preview")
|
||||
self.camera_overview_image.lock_aspect_ratio = True
|
||||
self.camera_overview_image.enable_fps_monitor = True
|
||||
self.camera_overview_image.enable_toolbar = False
|
||||
self.camera_overview_image.outer_axes = False
|
||||
self.camera_overview_image.inner_axes = False
|
||||
dev.cam_flomni_overview.start_live_mode()
|
||||
else:
|
||||
print("Cannot open camera_overview. Device does not exist.")
|
||||
@@ -104,15 +124,20 @@ class flomniGuiTools:
|
||||
# dev.cam_flomni_gripper.stop_live_mode()
|
||||
# dev.cam_xeye.live_mode = False
|
||||
if hasattr(self.gui, "flomni"):
|
||||
self.gui.flomni.delete_all()
|
||||
self.gui.flomni.delete_all(timeout=self.GUI_RPC_TIMEOUT)
|
||||
self.progressbar = None
|
||||
self.text_box = None
|
||||
self.xeyegui = None
|
||||
self.pdf_viewer = None
|
||||
self.idle_text_box = None
|
||||
self.camera_gripper_image = None
|
||||
self.camera_overview_image = None
|
||||
|
||||
def flomnigui_idle(self):
|
||||
self.flomnigui_show_gui()
|
||||
if self._flomnigui_check_attribute_not_exists("idle_text_box"):
|
||||
if self._flomnigui_is_missing("idle_text_box"):
|
||||
self.flomnigui_remove_all_docks()
|
||||
idle_text_box = self.gui.flomni.new("TextBox")
|
||||
self.idle_text_box = self.gui.flomni.new("TextBox")
|
||||
text = (
|
||||
"<pre>"
|
||||
+ "██████╗ ███████╗ ██████╗ ███████╗██╗ ██████╗ ███╗ ███╗███╗ ██╗██╗\n"
|
||||
@@ -123,7 +148,7 @@ class flomniGuiTools:
|
||||
+ "╚═════╝ ╚══════╝ ╚═════╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝\n"
|
||||
+ "</pre>"
|
||||
)
|
||||
idle_text_box.set_html_text(text)
|
||||
self.idle_text_box.set_html_text(text)
|
||||
|
||||
def flomnigui_docs(self, filename: str | None = None):
|
||||
import csaxs_bec
|
||||
@@ -167,7 +192,7 @@ class flomniGuiTools:
|
||||
# --- GUI handling (active existence check) ----------------------------
|
||||
self.flomnigui_show_gui()
|
||||
|
||||
if self._flomnigui_check_attribute_not_exists("PdfViewerWidget"):
|
||||
if self._flomnigui_is_missing("pdf_viewer"):
|
||||
self.flomnigui_remove_all_docks()
|
||||
self.pdf_viewer = self.gui.flomni.new(widget="PdfViewerWidget")
|
||||
|
||||
@@ -185,7 +210,7 @@ class flomniGuiTools:
|
||||
|
||||
def flomnigui_show_progress(self):
|
||||
self.flomnigui_show_gui()
|
||||
if self._flomnigui_check_attribute_not_exists("progressbar"):
|
||||
if self._flomnigui_is_missing("progressbar"):
|
||||
self.flomnigui_remove_all_docks()
|
||||
# Add a new dock with a RingProgressBar widget
|
||||
self.progressbar = self.gui.flomni.new("RingProgressBar")
|
||||
@@ -198,6 +223,14 @@ class flomniGuiTools:
|
||||
self._flomnigui_update_progress()
|
||||
|
||||
def _flomnigui_update_progress(self):
|
||||
"""Update the progress ring bar and center label from the current progress state.
|
||||
|
||||
``self.progress`` is backed by the BEC global variable ``tomo_progress``
|
||||
(see :class:`_ProgressProxy` in ``flomni.py``), so this method reflects
|
||||
the live state that is also accessible from other BEC client sessions via::
|
||||
|
||||
client.get_global_var("tomo_progress")
|
||||
"""
|
||||
main_progress_ring = self.progressbar.rings[0]
|
||||
subtomo_progress_ring = self.progressbar.rings[1]
|
||||
if self.progressbar is not None:
|
||||
@@ -210,6 +243,31 @@ class flomniGuiTools:
|
||||
main_progress_ring.set_value(progress)
|
||||
subtomo_progress_ring.set_value(subtomo_progress)
|
||||
|
||||
# --- format start time for display --------------------------------
|
||||
start_str = self.progress.get("tomo_start_time")
|
||||
if start_str is not None:
|
||||
import datetime as _dt
|
||||
start_display = _dt.datetime.fromisoformat(start_str).strftime("%Y-%m-%d %H:%M:%S")
|
||||
else:
|
||||
start_display = "N/A"
|
||||
|
||||
# --- format estimated remaining time ------------------------------
|
||||
remaining_s = self.progress.get("estimated_remaining_time")
|
||||
if remaining_s is not None and remaining_s >= 0:
|
||||
import datetime as _dt
|
||||
remaining_s = int(remaining_s)
|
||||
h, rem = divmod(remaining_s, 3600)
|
||||
m, s = divmod(rem, 60)
|
||||
if h > 0:
|
||||
eta_display = f"{h}h {m:02d}m {s:02d}s"
|
||||
elif m > 0:
|
||||
eta_display = f"{m}m {s:02d}s"
|
||||
else:
|
||||
eta_display = f"{s}s"
|
||||
else:
|
||||
eta_display = "N/A"
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
text = (
|
||||
f"Progress report:\n"
|
||||
f" Tomo type: {self.progress['tomo_type']}\n"
|
||||
@@ -218,7 +276,9 @@ class flomniGuiTools:
|
||||
f" Angle: {self.progress['angle']:.1f}\n"
|
||||
f" Current subtomo: {self.progress['subtomo']}\n"
|
||||
f" Current projection within subtomo: {self.progress['subtomo_projection']}\n"
|
||||
f" Total projections per subtomo: {int(self.progress['subtomo_total_projections'])}"
|
||||
f" Total projections per subtomo: {int(self.progress['subtomo_total_projections'])}\n"
|
||||
f" Scan started: {start_display}\n"
|
||||
f" Est. remaining: {eta_display}"
|
||||
)
|
||||
self.progressbar.set_center_label(text)
|
||||
|
||||
|
||||
@@ -253,6 +253,8 @@ class XrayEyeAlign:
|
||||
|
||||
umv(dev.rtx, 0)
|
||||
print("You are ready to remove the xray eye and start ptychography scans.")
|
||||
print("Fine alignment: flomni.tomo_parameters() , then flomni.tomo_alignment_scan()")
|
||||
print("After that, run the fit in Matlab and load the new fit flomni.read_alignment_offset()")
|
||||
|
||||
def write_output(self):
|
||||
file = os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues")
|
||||
|
||||
BIN
csaxs_bec/bec_ipython_client/plugins/omny/OMNY.png
Normal file
BIN
csaxs_bec/bec_ipython_client/plugins/omny/OMNY.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 359 KiB |
@@ -234,9 +234,10 @@ class TomoIDManager:
|
||||
)
|
||||
"""
|
||||
|
||||
OMNY_URL = "https://omny.web.psi.ch/samples/newmeasurement.php"
|
||||
OMNY_USER = "omny"
|
||||
OMNY_PASSWORD = "samples"
|
||||
#OMNY_URL = "https://omny.web.psi.ch/samples/newmeasurement.php"
|
||||
OMNY_URL = "https://v1p0zyg2w9n2k9c1.myfritz.net/samples/newmeasurement.php"
|
||||
OMNY_USER = ""
|
||||
OMNY_PASSWORD = ""
|
||||
TMP_FILE = "/tmp/currsamplesnr.txt"
|
||||
|
||||
def register(
|
||||
@@ -273,9 +274,14 @@ class TomoIDManager:
|
||||
f"&additional={additional_info}"
|
||||
f"&user={user}"
|
||||
)
|
||||
# subprocess.run(
|
||||
# f"wget --user={self.OMNY_USER} --password={self.OMNY_PASSWORD}"
|
||||
# f" -q -O {self.TMP_FILE} '{url}'",
|
||||
# shell=True,
|
||||
# )
|
||||
#print(url)
|
||||
subprocess.run(
|
||||
f"wget --user={self.OMNY_USER} --password={self.OMNY_PASSWORD}"
|
||||
f" -q -O {self.TMP_FILE} '{url}'",
|
||||
f"wget -q -O {self.TMP_FILE} '{url}'",
|
||||
shell=True,
|
||||
)
|
||||
with open(self.TMP_FILE) as f:
|
||||
|
||||
@@ -0,0 +1,96 @@
|
||||
"""
|
||||
omny/webpage_generator.py
|
||||
==========================
|
||||
OMNY-specific webpage generator subclass.
|
||||
|
||||
Integration (inside the OMNY __init__ / startup):
|
||||
--------------------------------------------------
|
||||
from csaxs_bec.bec_ipython_client.plugins.omny.webpage_generator import (
|
||||
OmnyWebpageGenerator,
|
||||
)
|
||||
self._webpage_gen = OmnyWebpageGenerator(
|
||||
bec_client=client,
|
||||
output_dir="~/data/raw/webpage/",
|
||||
)
|
||||
self._webpage_gen.start()
|
||||
|
||||
Or use the factory (auto-selects by session name "omny"):
|
||||
---------------------------------------------------------
|
||||
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
|
||||
make_webpage_generator,
|
||||
)
|
||||
self._webpage_gen = make_webpage_generator(bec, output_dir="~/data/raw/webpage/")
|
||||
self._webpage_gen.start()
|
||||
|
||||
Interactive helpers:
|
||||
--------------------
|
||||
omny._webpage_gen.status()
|
||||
omny._webpage_gen.verbosity = 2
|
||||
omny._webpage_gen.stop()
|
||||
omny._webpage_gen.start()
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
|
||||
WebpageGeneratorBase,
|
||||
_safe_get,
|
||||
_safe_float,
|
||||
_gvar,
|
||||
)
|
||||
|
||||
|
||||
class OmnyWebpageGenerator(WebpageGeneratorBase):
|
||||
"""
|
||||
OMNY-specific webpage generator.
|
||||
Logo: OMNY.png from the same directory as this file.
|
||||
|
||||
Override _collect_setup_data() to add OMNY-specific temperatures,
|
||||
sample name, and measurement settings.
|
||||
|
||||
The old OMNY spec webpage showed:
|
||||
- Cryo temperatures (XOMNY-TEMP-CRYO-A/B)
|
||||
- Per-channel temperatures (XOMNY-TEMP1..48)
|
||||
- Dewar pressure / LN2 flow
|
||||
- Interferometer strengths (OINTERF)
|
||||
Map these to BEC device paths below once available.
|
||||
"""
|
||||
|
||||
# TODO: fill in OMNY-specific device paths
|
||||
# label -> dotpath under device_manager.devices
|
||||
_TEMP_MAP = {
|
||||
# "Sample (cryo A)": "omny_temp.cryo_a",
|
||||
# "Cryo head (B)": "omny_temp.cryo_b",
|
||||
}
|
||||
|
||||
def _logo_path(self):
|
||||
return Path(__file__).parent / "OMNY.png"
|
||||
|
||||
def _collect_setup_data(self) -> dict:
|
||||
# ── OMNY-specific data goes here ──────────────────────────────
|
||||
# Uncomment and adapt when device names are known:
|
||||
#
|
||||
# dm = self._bec.device_manager
|
||||
# sample_name = _safe_get(dm, "omny_samples.sample_names.sample0") or "N/A"
|
||||
# temperatures = {
|
||||
# label: _safe_float(_safe_get(dm, path))
|
||||
# for label, path in self._TEMP_MAP.items()
|
||||
# }
|
||||
# settings = {
|
||||
# "Sample name": sample_name,
|
||||
# "FOV x / y": ...,
|
||||
# "Exposure time": _gvar(self._bec, "tomo_countingtime", ".3f", " s"),
|
||||
# "Angle step": _gvar(self._bec, "tomo_angle_stepsize", ".2f", "\u00b0"),
|
||||
# }
|
||||
# return {
|
||||
# "type": "omny",
|
||||
# "sample_name": sample_name,
|
||||
# "temperatures": temperatures,
|
||||
# "settings": settings,
|
||||
# }
|
||||
|
||||
# Placeholder — returns minimal info until implemented
|
||||
return {
|
||||
"type": "omny",
|
||||
# OMNY-specific data here
|
||||
}
|
||||
@@ -38,12 +38,14 @@ class XRayEye(RPCBase):
|
||||
None
|
||||
"""
|
||||
|
||||
@rpc_timeout(20)
|
||||
@rpc_call
|
||||
def on_live_view_enabled(self, enabled: "bool"):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@rpc_timeout(20)
|
||||
@rpc_call
|
||||
def on_motors_enable(self, x_enable: "bool", y_enable: "bool"):
|
||||
"""
|
||||
@@ -54,6 +56,7 @@ class XRayEye(RPCBase):
|
||||
y_enable(bool): enable y motor controls
|
||||
"""
|
||||
|
||||
@rpc_timeout(20)
|
||||
@rpc_call
|
||||
def enable_submit_button(self, enable: "bool"):
|
||||
"""
|
||||
@@ -90,12 +93,14 @@ class XRayEye(RPCBase):
|
||||
None
|
||||
"""
|
||||
|
||||
@rpc_timeout(20)
|
||||
@rpc_call
|
||||
def switch_tab(self, tab: "str"):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@rpc_timeout(20)
|
||||
@rpc_call
|
||||
def submit_fit_array(self, fit_array):
|
||||
"""
|
||||
|
||||
@@ -4,6 +4,7 @@ from bec_lib import bec_logger
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_qthemes import material_icon
|
||||
from bec_widgets import BECWidget, SafeProperty, SafeSlot
|
||||
from bec_widgets.utils.rpc_decorator import rpc_timeout
|
||||
from bec_widgets.widgets.plots.image.image import Image
|
||||
from bec_widgets.widgets.plots.waveform.waveform import Waveform
|
||||
from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree
|
||||
@@ -367,6 +368,7 @@ class XRayEye(BECWidget, QWidget):
|
||||
return self.message_line_edit.toPlainText()
|
||||
|
||||
@user_message.setter
|
||||
@rpc_timeout(20)
|
||||
def user_message(self, message: str):
|
||||
self.message_line_edit.setText(message)
|
||||
|
||||
@@ -375,6 +377,7 @@ class XRayEye(BECWidget, QWidget):
|
||||
return self.sample_name_line_edit.text()
|
||||
|
||||
@sample_name.setter
|
||||
@rpc_timeout(20)
|
||||
def sample_name(self, message: str):
|
||||
self.sample_name_line_edit.setText(message)
|
||||
|
||||
@@ -395,6 +398,7 @@ class XRayEye(BECWidget, QWidget):
|
||||
################################################################################
|
||||
|
||||
@SafeSlot(str)
|
||||
@rpc_timeout(20)
|
||||
def switch_tab(self, tab: str):
|
||||
if tab == "fit":
|
||||
self.tab_widget.setCurrentIndex(1)
|
||||
@@ -412,6 +416,7 @@ class XRayEye(BECWidget, QWidget):
|
||||
return roi.get_coordinates()
|
||||
|
||||
@SafeSlot(bool)
|
||||
@rpc_timeout(20)
|
||||
def on_live_view_enabled(self, enabled: bool):
|
||||
logger.info(f"Live view is enabled: {enabled}")
|
||||
self.live_preview_toggle.blockSignals(True)
|
||||
@@ -460,6 +465,7 @@ class XRayEye(BECWidget, QWidget):
|
||||
self.shutter_toggle.blockSignals(False)
|
||||
|
||||
@SafeSlot(bool, bool)
|
||||
@rpc_timeout(20)
|
||||
def on_motors_enable(self, x_enable: bool, y_enable: bool):
|
||||
"""
|
||||
Enable/Disable motor controls
|
||||
@@ -472,6 +478,7 @@ class XRayEye(BECWidget, QWidget):
|
||||
self.motor_control_2d.enable_controls_ver(y_enable)
|
||||
|
||||
@SafeSlot(bool)
|
||||
@rpc_timeout(20)
|
||||
def enable_submit_button(self, enable: bool):
|
||||
"""
|
||||
Enable/disable submit button.
|
||||
@@ -509,6 +516,7 @@ class XRayEye(BECWidget, QWidget):
|
||||
print(f"meta: {meta}")
|
||||
|
||||
@SafeSlot()
|
||||
@rpc_timeout(20)
|
||||
def submit_fit_array(self, fit_array):
|
||||
self.tab_widget.setCurrentIndex(1)
|
||||
# self.fix_x.title = " got fit array"
|
||||
|
||||
@@ -72,7 +72,7 @@ xbpm3x:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -95,7 +95,7 @@ xbpm3y:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -118,7 +118,7 @@ sl3trxi:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -141,7 +141,7 @@ sl3trxo:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -164,7 +164,7 @@ sl3trxb:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -187,7 +187,7 @@ sl3trxt:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -210,7 +210,7 @@ fast_shutter_n1_x:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -234,7 +234,7 @@ fast_shutter_o1_x:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -257,7 +257,7 @@ fast_shutter_o2_x:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -280,7 +280,7 @@ filter_array_1_x:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -303,7 +303,7 @@ filter_array_2_x:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -326,7 +326,7 @@ filter_array_3_x:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -349,7 +349,7 @@ filter_array_4_x:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -372,7 +372,7 @@ sl4trxi:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -395,7 +395,7 @@ sl4trxo:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -418,7 +418,7 @@ sl4trxb:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -441,7 +441,7 @@ sl4trxt:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -466,7 +466,7 @@ sl5trxi:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -489,7 +489,7 @@ sl5trxo:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -512,7 +512,7 @@ sl5trxb:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -535,7 +535,7 @@ sl5trxt:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -544,6 +544,66 @@ sl5trxt:
|
||||
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
|
||||
bl_smar_stage: 5
|
||||
|
||||
sl5ch:
|
||||
description: ESbox1 slit 5 center horizontal
|
||||
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
|
||||
deviceConfig:
|
||||
left_slit: sl5trxi
|
||||
right_slit: sl5trxo
|
||||
offset: 0
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
needs:
|
||||
- sl5trxi
|
||||
- sl5trxo
|
||||
|
||||
sl5wh:
|
||||
description: ESbox1 slit 5 width horizontal
|
||||
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
|
||||
deviceConfig:
|
||||
left_slit: sl5trxi
|
||||
right_slit: sl5trxo
|
||||
offset: 0
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
needs:
|
||||
- sl5trxi
|
||||
- sl5trxo
|
||||
|
||||
sl5cv:
|
||||
description: ESbox1 slit 5 center vertical
|
||||
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
|
||||
deviceConfig:
|
||||
left_slit: sl5trxb
|
||||
right_slit: sl5trxt
|
||||
offset: 0
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
needs:
|
||||
- sl5trxb
|
||||
- sl5trxt
|
||||
|
||||
sl5wv:
|
||||
description: ESbox1 slit 5 width vertical
|
||||
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
|
||||
deviceConfig:
|
||||
left_slit: sl5trxb
|
||||
right_slit: sl5trxt
|
||||
offset: 0
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
needs:
|
||||
- sl5trxb
|
||||
- sl5trxt
|
||||
|
||||
xbimtrx:
|
||||
description: ESbox2 beam intensity monitor x movement
|
||||
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
|
||||
@@ -558,7 +618,7 @@ xbimtrx:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -581,7 +641,7 @@ xbimtry:
|
||||
# precision: 3
|
||||
# tolerance: 0.005
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -589,3 +649,295 @@ xbimtry:
|
||||
init_position: 0
|
||||
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
|
||||
bl_smar_stage: 1
|
||||
|
||||
|
||||
|
||||
################### XBOX related ###################
|
||||
# we assue the epics settings for resolution, velocity etc. are correct
|
||||
# we do not overwrite from here
|
||||
|
||||
aptrx:
|
||||
description: Aperture pinhole X
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-PIN1:TRX1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
aptry:
|
||||
description: Aperture pinhole Y
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-PIN1:TRY1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
ebtrx:
|
||||
description: Exposure box aperture X
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-EB:TRX1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
ebtry:
|
||||
description: Exposure box aperture Y
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-EB:TRY1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
ebtrz:
|
||||
description: Exposure box aperture Z
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-EB:TRZ1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
ebsupport:
|
||||
description: Exposure box granite support Y
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-EH1-EB:TRY1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
fttrx1:
|
||||
description: FTS1 translation X
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-FTS1:TRX1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
fttry1:
|
||||
description: FTS1 translation Y
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-FTS1:TRY1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
fttrx2:
|
||||
description: FTS2 translation X
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-FTS2:TRX1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
fttry2:
|
||||
description: FTS2 translation Y
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-FTS2:TRY1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
fttrz:
|
||||
description: FTS1 translation Z
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-FTS1:TRZ1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
bs1x:
|
||||
description: Beamstop 1 X
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-BS1:TRX1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
bs1y:
|
||||
description: Beamstop 1 Y
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-BS1:TRY1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
bs2x:
|
||||
description: Beamstop 2 X
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-BS2:TRX1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
bs2y:
|
||||
description: Beamstop 2 Y
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-BS2:TRY1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
dttrx:
|
||||
description: Detector table X
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-DETT:TRX1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
dttry:
|
||||
description: Detector table Y
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-DETT:TRY1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
dttrz:
|
||||
description: Detector table Z
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-DETT:TRZ1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
dettrx:
|
||||
description: Detector 1 X
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES1-DET1:TRX1
|
||||
deviceTags:
|
||||
- cSAXS_ES
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
|
||||
####################
|
||||
### Beamstop diode control for flight tube
|
||||
####################
|
||||
|
||||
beamstop_gain_control:
|
||||
description: Gain control for beamstop flightube
|
||||
deviceClass: csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl
|
||||
deviceConfig:
|
||||
gain_lsb: galilrioesft.digital_out.ch0 # Pin 10 -> Galil ch0
|
||||
gain_mid: galilrioesft.digital_out.ch1 # Pin 11 -> Galil ch1
|
||||
gain_msb: galilrioesft.digital_out.ch2 # Pin 12 -> Galil ch2
|
||||
coupling: galilrioesft.digital_out.ch3 # Pin 13 -> Galil ch3
|
||||
speed_mode: galilrioesft.digital_out.ch4 # Pin 14 -> Galil ch4
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
onFailure: retry
|
||||
needs:
|
||||
- galilrioesft
|
||||
|
||||
galilrioesft:
|
||||
description: Galil RIO for remote gain switching and slow reading FlightTube
|
||||
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
|
||||
deviceConfig:
|
||||
host: galilrioesft.psi.ch
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
|
||||
beamstop_dummy_bpm:
|
||||
description: BPM Xbox 2 (First Xbox in ES hutch)
|
||||
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
|
||||
deviceConfig:
|
||||
left_top: galilrioesft.analog_in.ch0
|
||||
right_top: galilrioesft.analog_in.ch1
|
||||
right_bot: galilrioesft.analog_in.ch2
|
||||
left_bot: galilrioesft.analog_in.ch3
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
onFailure: retry
|
||||
needs:
|
||||
- galilrioesft
|
||||
|
||||
beamstop_intensity:
|
||||
description: Beamstop intensity from Galil analog input ch6
|
||||
deviceClass: csaxs_bec.devices.pseudo_devices.signal_forwarder.SignalForwarder
|
||||
deviceConfig:
|
||||
signal: galilrioesft.analog_in.ch6
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
onFailure: retry
|
||||
needs:
|
||||
- galilrioesft
|
||||
|
||||
|
||||
|
||||
@@ -199,6 +199,25 @@ xbpm1c4:
|
||||
readOnly: true
|
||||
softwareTrigger: false
|
||||
|
||||
bpm1:
|
||||
description: 'XBPM1 (frontend)'
|
||||
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
|
||||
deviceConfig:
|
||||
left_top: xbpm1c1
|
||||
right_top: xbpm1c2
|
||||
right_bot: xbpm1c3
|
||||
left_bot: xbpm1c4
|
||||
onFailure: raise
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
readOnly: true
|
||||
softwareTrigger: false
|
||||
needs:
|
||||
- xbpm1c1
|
||||
- xbpm1c2
|
||||
- xbpm1c3
|
||||
- xbpm1c4
|
||||
|
||||
############################################
|
||||
######### End of xbpm sub devices ##########
|
||||
############################################
|
||||
|
||||
@@ -68,91 +68,110 @@ ccmx:
|
||||
- cSAXS
|
||||
- optics
|
||||
|
||||
# TO BE REVIEWED, REMOVE VELOCITY WITH NEW CLASS!
|
||||
ccm_energy:
|
||||
description: 'test'
|
||||
deviceClass: ophyd_devices.devices.simple_positioner.PSISimplePositioner
|
||||
deviceConfig:
|
||||
prefix: 'X12SA-OP-CCM1:'
|
||||
override_suffixes:
|
||||
user_readback: "ENERGY-GET"
|
||||
user_setpoint: "ENERGY-SET"
|
||||
velocity: "ROTY.VELO"
|
||||
motor_done_move: "ROTY.DMOV"
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
|
||||
|
||||
|
||||
##########################################################################
|
||||
######################## SMARACT STAGES ##################################
|
||||
##########################################################################
|
||||
|
||||
xbpm2x:
|
||||
description: X-ray beam position monitor 1 in OPbox
|
||||
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
|
||||
deviceConfig:
|
||||
axis_Id: A
|
||||
host: x12sa-eb-smaract-mcs-03.psi.ch
|
||||
limits:
|
||||
- -200
|
||||
- 200
|
||||
port: 5000
|
||||
sign: 1
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
userParameter:
|
||||
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
|
||||
bl_smar_stage: 0
|
||||
# xbpm2x:
|
||||
# description: X-ray beam position monitor 1 in OPbox
|
||||
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
|
||||
# deviceConfig:
|
||||
# axis_Id: A
|
||||
# host: x12sa-eb-smaract-mcs-03.psi.ch
|
||||
# limits:
|
||||
# - -200
|
||||
# - 200
|
||||
# port: 5000
|
||||
# sign: 1
|
||||
# enabled: true
|
||||
# onFailure: buffer
|
||||
# readOnly: false
|
||||
# readoutPriority: baseline
|
||||
# connectionTimeout: 20
|
||||
# userParameter:
|
||||
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
|
||||
# bl_smar_stage: 0
|
||||
|
||||
xbpm2y:
|
||||
description: X-ray beam position monitor 1 in OPbox
|
||||
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
|
||||
deviceConfig:
|
||||
axis_Id: B
|
||||
host: x12sa-eb-smaract-mcs-03.psi.ch
|
||||
limits:
|
||||
- -200
|
||||
- 200
|
||||
port: 5000
|
||||
sign: 1
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
userParameter:
|
||||
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
|
||||
bl_smar_stage: 1
|
||||
# xbpm2y:
|
||||
# description: X-ray beam position monitor 1 in OPbox
|
||||
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
|
||||
# deviceConfig:
|
||||
# axis_Id: B
|
||||
# host: x12sa-eb-smaract-mcs-03.psi.ch
|
||||
# limits:
|
||||
# - -200
|
||||
# - 200
|
||||
# port: 5000
|
||||
# sign: 1
|
||||
# enabled: true
|
||||
# onFailure: buffer
|
||||
# readOnly: false
|
||||
# readoutPriority: baseline
|
||||
# connectionTimeout: 20
|
||||
# userParameter:
|
||||
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
|
||||
# bl_smar_stage: 1
|
||||
|
||||
# cu_foilx:
|
||||
# description: Cu foil in OPbox
|
||||
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
|
||||
# deviceConfig:
|
||||
# axis_Id: C
|
||||
# host: x12sa-eb-smaract-mcs-03.psi.ch
|
||||
# limits:
|
||||
# - -200
|
||||
# - 200
|
||||
# port: 5000
|
||||
# sign: 1
|
||||
# enabled: true
|
||||
# onFailure: buffer
|
||||
# readOnly: false
|
||||
# readoutPriority: baseline
|
||||
# connectionTimeout: 20
|
||||
# userParameter:
|
||||
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
|
||||
# bl_smar_stage: 2
|
||||
|
||||
# scinx:
|
||||
# description: scintillator in OPbox
|
||||
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
|
||||
# deviceConfig:
|
||||
# axis_Id: D
|
||||
# host: x12sa-eb-smaract-mcs-03.psi.ch
|
||||
# limits:
|
||||
# - -200
|
||||
# - 200
|
||||
# port: 5000
|
||||
# sign: 1
|
||||
# enabled: true
|
||||
# onFailure: buffer
|
||||
# readOnly: false
|
||||
# readoutPriority: baseline
|
||||
# connectionTimeout: 20
|
||||
# userParameter:
|
||||
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
|
||||
# bl_smar_stage: 3
|
||||
|
||||
cu_foilx:
|
||||
description: Cu foil in OPbox
|
||||
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
|
||||
deviceConfig:
|
||||
axis_Id: C
|
||||
host: x12sa-eb-smaract-mcs-03.psi.ch
|
||||
limits:
|
||||
- -200
|
||||
- 200
|
||||
port: 5000
|
||||
sign: 1
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
userParameter:
|
||||
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
|
||||
bl_smar_stage: 2
|
||||
|
||||
scinx:
|
||||
description: scintillator in OPbox
|
||||
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
|
||||
deviceConfig:
|
||||
axis_Id: D
|
||||
host: x12sa-eb-smaract-mcs-03.psi.ch
|
||||
limits:
|
||||
- -200
|
||||
- 200
|
||||
port: 5000
|
||||
sign: 1
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
userParameter:
|
||||
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
|
||||
bl_smar_stage: 3
|
||||
|
||||
|
||||
# dmm1_trx_readback_example: # This is the same template as for i.e. bpm4i
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
# This is the main configuration file that is
|
||||
# commented or uncommented according to the type of experiment
|
||||
|
||||
# optics:
|
||||
# - !include ./bl_optics_hutch.yaml
|
||||
optics:
|
||||
- !include ./bl_optics_hutch.yaml
|
||||
|
||||
# frontend:
|
||||
# - !include ./bl_frontend.yaml
|
||||
|
||||
@@ -395,6 +395,16 @@ rtz:
|
||||
readoutPriority: on_request
|
||||
connectionTimeout: 20
|
||||
|
||||
rt_positions:
|
||||
deviceClass: csaxs_bec.devices.omny.rt.rt_flomni_ophyd.RtFlomniFlyer
|
||||
deviceConfig:
|
||||
host: mpc2844.psi.ch
|
||||
port: 2222
|
||||
readoutPriority: async
|
||||
connectionTimeout: 20
|
||||
enabled: true
|
||||
readOnly: False
|
||||
|
||||
############################################################
|
||||
####################### Cameras ############################
|
||||
############################################################
|
||||
@@ -512,6 +522,19 @@ omny_panda:
|
||||
FMC_IN.VAL2.Min: cap_voltage_fzp_x_min
|
||||
FMC_IN.VAL2.Max: cap_voltage_fzp_x_max
|
||||
FMC_IN.VAL2.Mean: cap_voltage_fzp_x_mean
|
||||
INENC1.VAL.Max: interf_st_fzp_y_max
|
||||
INENC1.VAL.Mean: interf_st_fzp_y_mean
|
||||
INENC1.VAL.Min: interf_st_fzp_y_min
|
||||
INENC2.VAL.Max: interf_st_fzp_x_max
|
||||
INENC2.VAL.Mean: interf_st_fzp_x_mean
|
||||
INENC2.VAL.Min: interf_st_fzp_x_min
|
||||
INENC3.VAL.Max: interf_st_rotz_max
|
||||
INENC3.VAL.Mean: interf_st_rotz_mean
|
||||
INENC3.VAL.Min: interf_st_rotz_min
|
||||
INENC4.VAL.Max: interf_st_rotx_max
|
||||
INENC4.VAL.Mean: interf_st_rotx_mean
|
||||
INENC4.VAL.Min: interf_st_rotx_min
|
||||
PCAP.GATE_DURATION.Value: pcap_gate_duration_value
|
||||
deviceTags:
|
||||
- detector
|
||||
enabled: true
|
||||
|
||||
24
csaxs_bec/device_configs/test_config.yaml
Normal file
24
csaxs_bec/device_configs/test_config.yaml
Normal file
@@ -0,0 +1,24 @@
|
||||
galilrioesxbox:
|
||||
description: Galil RIO for remote gain switching and slow reading ES XBox
|
||||
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
|
||||
deviceConfig:
|
||||
host: galilrioesft.psi.ch
|
||||
enabled: true
|
||||
onFailure: raise
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
bpm1:
|
||||
readoutPriority: baseline
|
||||
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
|
||||
deviceConfig:
|
||||
blade_t: galilrioesxbox.analog_in.ch0
|
||||
blade_r: galilrioesxbox.analog_in.ch1
|
||||
blade_b: galilrioesxbox.analog_in.ch2
|
||||
blade_l: galilrioesxbox.analog_in.ch3
|
||||
enabled: true
|
||||
readOnly: false
|
||||
softwareTrigger: true
|
||||
needs:
|
||||
- galilrioesxbox
|
||||
|
||||
@@ -317,8 +317,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
try:
|
||||
scan_done = bool(value == self._num_total_triggers)
|
||||
self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done)
|
||||
if scan_done:
|
||||
self._scan_done_event.set()
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
logger.info(f"Device {self.name} error: {content}")
|
||||
@@ -393,6 +391,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
self._current_data_index = 0
|
||||
|
||||
# NOTE Make sure that the signal that omits mca callbacks is cleared
|
||||
# DO NOT REMOVE!!
|
||||
self._omit_mca_callbacks.clear()
|
||||
|
||||
# For a fly scan we need to start the mcs card ourselves
|
||||
@@ -563,8 +562,9 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Hook called when the device is stopped. In addition, any status that is registered through cancel_on_stop will be cancelled here."""
|
||||
self.stop_all.put(1)
|
||||
self.erase_all.put(1)
|
||||
with suppress_mca_callbacks(self):
|
||||
self.stop_all.put(1)
|
||||
self.erase_all.put(1)
|
||||
|
||||
def mcs_recovery(self, timeout: int = 1) -> None:
|
||||
"""
|
||||
|
||||
@@ -1,20 +1,18 @@
|
||||
import threading
|
||||
import time
|
||||
from typing import List
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import bec_logger, messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device, PositionerBase, Signal
|
||||
from ophyd.status import wait as status_wait
|
||||
from ophyd.utils import LimitError
|
||||
from ophyd_devices import AsyncMultiSignal, DeviceStatus, ProgressSignal
|
||||
from ophyd_devices.utils.controller import Controller, threadlocked
|
||||
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
|
||||
from prettytable import PrettyTable
|
||||
|
||||
from csaxs_bec.devices.omny.rt.rt_ophyd import (
|
||||
BECConfigError,
|
||||
RtCommunicationError,
|
||||
RtError,
|
||||
RtReadbackSignal,
|
||||
@@ -432,27 +430,6 @@ class RtFlomniController(Controller):
|
||||
t.add_row([i, self.read_ssi_interferometer(i)])
|
||||
print(t)
|
||||
|
||||
def _get_signals_from_table(self, return_table) -> dict:
|
||||
self.average_stdeviations_x_st_fzp += float(return_table[4])
|
||||
self.average_stdeviations_y_st_fzp += float(return_table[7])
|
||||
signals = {
|
||||
"target_x": {"value": float(return_table[2])},
|
||||
"average_x_st_fzp": {"value": float(return_table[3])},
|
||||
"stdev_x_st_fzp": {"value": float(return_table[4])},
|
||||
"target_y": {"value": float(return_table[5])},
|
||||
"average_y_st_fzp": {"value": float(return_table[6])},
|
||||
"stdev_y_st_fzp": {"value": float(return_table[7])},
|
||||
"average_rotz": {"value": float(return_table[8])},
|
||||
"stdev_rotz": {"value": float(return_table[9])},
|
||||
"average_stdeviations_x_st_fzp": {
|
||||
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
|
||||
},
|
||||
"average_stdeviations_y_st_fzp": {
|
||||
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
|
||||
},
|
||||
}
|
||||
return signals
|
||||
|
||||
@threadlocked
|
||||
def start_scan(self):
|
||||
if not self.feedback_is_running():
|
||||
@@ -492,91 +469,6 @@ class RtFlomniController(Controller):
|
||||
current_position_in_scan = int(float(return_table[2]))
|
||||
return (mode, number_of_positions_planned, current_position_in_scan)
|
||||
|
||||
def read_positions_from_sampler(self):
|
||||
# this was for reading after the scan completed
|
||||
number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read
|
||||
|
||||
read_counter = 0
|
||||
|
||||
self.average_stdeviations_x_st_fzp = 0
|
||||
self.average_stdeviations_y_st_fzp = 0
|
||||
self.average_lamni_angle = 0
|
||||
|
||||
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
|
||||
|
||||
# if not (mode==2 or mode==3):
|
||||
# error
|
||||
self.device_manager.connector.set(
|
||||
MessageEndpoints.device_status("rt_scan"),
|
||||
messages.DeviceStatusMessage(
|
||||
device="rt_scan", status=1, metadata=self.readout_metadata
|
||||
),
|
||||
)
|
||||
# while scan is running
|
||||
while mode > 0:
|
||||
|
||||
# TODO here?: scan abortion if no progress in scan *raise error
|
||||
|
||||
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
|
||||
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
|
||||
time.sleep(0.01)
|
||||
if current_position_in_scan > 5:
|
||||
while current_position_in_scan > read_counter + 1:
|
||||
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
|
||||
# logger.info(f"{return_table}")
|
||||
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
|
||||
|
||||
read_counter = read_counter + 1
|
||||
|
||||
signals = self._get_signals_from_table(return_table)
|
||||
|
||||
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
|
||||
|
||||
time.sleep(0.05)
|
||||
|
||||
# read the last samples even though scan is finished already
|
||||
while number_of_positions_planned > read_counter:
|
||||
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
|
||||
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
|
||||
# logger.info(f"{return_table}")
|
||||
read_counter = read_counter + 1
|
||||
|
||||
signals = self._get_signals_from_table(return_table)
|
||||
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
|
||||
|
||||
self.device_manager.connector.set(
|
||||
MessageEndpoints.device_status("rt_scan"),
|
||||
messages.DeviceStatusMessage(
|
||||
device="rt_scan", status=0, metadata=self.readout_metadata
|
||||
),
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Flomni statistics: Average of all standard deviations: x"
|
||||
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
|
||||
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}"
|
||||
)
|
||||
|
||||
def publish_device_data(self, signals, point_id):
|
||||
self.device_manager.connector.set_and_publish(
|
||||
MessageEndpoints.device_read("rt_flomni"),
|
||||
messages.DeviceMessage(
|
||||
signals=signals, metadata={"point_id": point_id, **self.readout_metadata}
|
||||
),
|
||||
)
|
||||
|
||||
def start_readout(self):
|
||||
readout = threading.Thread(target=self.read_positions_from_sampler)
|
||||
readout.start()
|
||||
|
||||
def kickoff(self, metadata):
|
||||
self.readout_metadata = metadata
|
||||
while not self._min_scan_buffer_reached:
|
||||
time.sleep(0.001)
|
||||
self.start_scan()
|
||||
time.sleep(0.1)
|
||||
self.start_readout()
|
||||
|
||||
|
||||
class RtFlomniReadbackSignal(RtReadbackSignal):
|
||||
@retry_once
|
||||
@@ -844,6 +736,185 @@ class RtFlomniMotor(Device, PositionerBase):
|
||||
return super().stop(success=success)
|
||||
|
||||
|
||||
class RtFlomniFlyer(Device):
|
||||
USER_ACCESS = ["controller"]
|
||||
data = Cpt(
|
||||
AsyncMultiSignal,
|
||||
name="data",
|
||||
signals=[
|
||||
"target_x",
|
||||
"average_x_st_fzp",
|
||||
"stdev_x_st_fzp",
|
||||
"target_y",
|
||||
"average_y_st_fzp",
|
||||
"stdev_y_st_fzp",
|
||||
"average_rotz",
|
||||
"stdev_rotz",
|
||||
"average_stdeviations_x_st_fzp",
|
||||
"average_stdeviations_y_st_fzp",
|
||||
],
|
||||
ndim=1,
|
||||
async_update={"type": "add", "max_shape": [None]},
|
||||
max_size=1000,
|
||||
)
|
||||
progress = Cpt(
|
||||
ProgressSignal, doc="ProgressSignal indicating the progress of the device during a scan."
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
prefix="",
|
||||
*,
|
||||
name,
|
||||
kind=None,
|
||||
read_attrs=None,
|
||||
configuration_attrs=None,
|
||||
parent=None,
|
||||
host="mpc2844.psi.ch",
|
||||
port=2222,
|
||||
socket_cls=SocketIO,
|
||||
device_manager=None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(prefix=prefix, name=name, parent=parent, **kwargs)
|
||||
self.shutdown_event = threading.Event()
|
||||
self.controller = RtFlomniController(
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
|
||||
)
|
||||
self.average_stdeviations_x_st_fzp = 0
|
||||
self.average_stdeviations_y_st_fzp = 0
|
||||
self.average_lamni_angle = 0
|
||||
self.readout_thread = None
|
||||
self.scan_done_event = threading.Event()
|
||||
self.scan_done_event.set()
|
||||
|
||||
def read_positions_from_sampler(self, status: DeviceStatus):
|
||||
"""
|
||||
Read the positions from the sampler and update the data signal.
|
||||
This function runs in a separate thread and continuously checks the
|
||||
scan status.
|
||||
|
||||
Args:
|
||||
status (DeviceStatus): The status object to update when the readout is complete.
|
||||
"""
|
||||
read_counter = 0
|
||||
self.average_stdeviations_x_st_fzp = 0
|
||||
self.average_stdeviations_y_st_fzp = 0
|
||||
self.average_lamni_angle = 0
|
||||
|
||||
mode, number_of_positions_planned, current_position_in_scan = (
|
||||
self.controller.get_scan_status()
|
||||
)
|
||||
|
||||
# while scan is running
|
||||
while mode > 0 and not self.shutdown_event.wait(0.01):
|
||||
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
|
||||
mode, number_of_positions_planned, current_position_in_scan = (
|
||||
self.controller.get_scan_status()
|
||||
)
|
||||
if current_position_in_scan > 5:
|
||||
while current_position_in_scan > read_counter + 1:
|
||||
return_table = (
|
||||
self.controller.socket_put_and_receive(f"r{read_counter}")
|
||||
).split(",")
|
||||
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
|
||||
self.progress.put(
|
||||
value=read_counter, max_value=number_of_positions_planned, done=False
|
||||
)
|
||||
read_counter = read_counter + 1
|
||||
signals = self._get_signals_from_table(return_table)
|
||||
self.data.set(signals)
|
||||
|
||||
if self.shutdown_event.wait(0.05):
|
||||
logger.info("Shutdown event set, stopping readout.")
|
||||
# if we are here, the shutdown_event is set. We can exit the readout loop.
|
||||
status.set_finished()
|
||||
return
|
||||
|
||||
# read the last samples even though scan is finished already
|
||||
while number_of_positions_planned > read_counter and not self.shutdown_event.is_set():
|
||||
return_table = (self.controller.socket_put_and_receive(f"r{read_counter}")).split(",")
|
||||
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
|
||||
self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=False)
|
||||
read_counter = read_counter + 1
|
||||
|
||||
signals = self._get_signals_from_table(return_table)
|
||||
self.data.set(signals)
|
||||
|
||||
# NOTE: No need to set the status to failed if the shutdown_event is set.
|
||||
# The stop() method will take care of that.
|
||||
status.set_finished()
|
||||
self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=True)
|
||||
|
||||
logger.info(
|
||||
"Flomni statistics: Average of all standard deviations: x"
|
||||
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
|
||||
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}"
|
||||
)
|
||||
|
||||
def _get_signals_from_table(self, return_table) -> dict:
|
||||
self.average_stdeviations_x_st_fzp += float(return_table[4])
|
||||
self.average_stdeviations_y_st_fzp += float(return_table[7])
|
||||
signals = {
|
||||
"target_x": {"value": float(return_table[2])},
|
||||
"average_x_st_fzp": {"value": float(return_table[3])},
|
||||
"stdev_x_st_fzp": {"value": float(return_table[4])},
|
||||
"target_y": {"value": float(return_table[5])},
|
||||
"average_y_st_fzp": {"value": float(return_table[6])},
|
||||
"stdev_y_st_fzp": {"value": float(return_table[7])},
|
||||
"average_rotz": {"value": float(return_table[8])},
|
||||
"stdev_rotz": {"value": float(return_table[9])},
|
||||
"average_stdeviations_x_st_fzp": {
|
||||
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
|
||||
},
|
||||
"average_stdeviations_y_st_fzp": {
|
||||
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
|
||||
},
|
||||
}
|
||||
return signals
|
||||
|
||||
def stage(self):
|
||||
self.shutdown_event.clear()
|
||||
self.scan_done_event.set()
|
||||
return super().stage()
|
||||
|
||||
def start_readout(self, status: DeviceStatus):
|
||||
self.readout_thread = threading.Thread(
|
||||
target=self.read_positions_from_sampler, args=(status,)
|
||||
)
|
||||
self.readout_thread.start()
|
||||
|
||||
def kickoff(self) -> DeviceStatus:
|
||||
self.shutdown_event.clear()
|
||||
self.scan_done_event.clear()
|
||||
while not self.controller._min_scan_buffer_reached and not self.shutdown_event.wait(0.001):
|
||||
...
|
||||
self.controller.start_scan()
|
||||
self.shutdown_event.wait(0.1)
|
||||
status = DeviceStatus(self)
|
||||
status.set_finished()
|
||||
return status
|
||||
|
||||
def complete(self) -> DeviceStatus:
|
||||
"""Wait until the flyer is done."""
|
||||
if self.scan_done_event.is_set():
|
||||
# if the scan_done_event is already set, we can return a finished status immediately
|
||||
status = DeviceStatus(self)
|
||||
status.set_finished()
|
||||
return status
|
||||
status = DeviceStatus(self)
|
||||
self.start_readout(status)
|
||||
status.add_callback(lambda *args, **kwargs: self.scan_done_event.set())
|
||||
return status
|
||||
|
||||
def stop(self, *, success=False):
|
||||
self.shutdown_event.set()
|
||||
self.scan_done_event.set()
|
||||
if self.readout_thread is not None:
|
||||
self.readout_thread.join()
|
||||
return super().stop(success=success)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
rtcontroller = RtFlomniController(
|
||||
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None
|
||||
|
||||
@@ -13,6 +13,14 @@ from ophyd_devices import PSIDeviceBase
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class MonitorSignal(Signal):
|
||||
"""A simple wrapper around ophyd Signal that automatically monitors the signal for changes."""
|
||||
|
||||
def __init__(self, *, name, auto_monitor=False, **kwargs):
|
||||
super().__init__(name=name, **kwargs)
|
||||
self.auto_monitor = auto_monitor
|
||||
|
||||
|
||||
class OMNYFastShutter(PSIDeviceBase, Device):
|
||||
"""
|
||||
Fast Shutter control for OMNY setup. If started with at the beamline, it will expose
|
||||
@@ -26,7 +34,7 @@ class OMNYFastShutter(PSIDeviceBase, Device):
|
||||
SUB_VALUE = "value"
|
||||
_default_sub = SUB_VALUE
|
||||
|
||||
shutter = Cpt(Signal, name="shutter")
|
||||
shutter = Cpt(MonitorSignal, name="shutter", auto_monitor=True)
|
||||
|
||||
# -----------------------------------------------------
|
||||
# User-facing shutter control functions
|
||||
@@ -48,7 +56,6 @@ class OMNYFastShutter(PSIDeviceBase, Device):
|
||||
def fshopen(self):
|
||||
"""Open the fast shutter."""
|
||||
if self._check_if_cSAXS_shutter_exists_in_config():
|
||||
self.shutter.put(1)
|
||||
return self.device_manager.devices["fsh"].fshopen()
|
||||
else:
|
||||
self.shutter.put(1)
|
||||
@@ -56,7 +63,6 @@ class OMNYFastShutter(PSIDeviceBase, Device):
|
||||
def fshclose(self):
|
||||
"""Close the fast shutter."""
|
||||
if self._check_if_cSAXS_shutter_exists_in_config():
|
||||
self.shutter.put(0)
|
||||
return self.device_manager.devices["fsh"].fshclose()
|
||||
else:
|
||||
self.shutter.put(0)
|
||||
|
||||
0
csaxs_bec/devices/pseudo_devices/__init__.py
Normal file
0
csaxs_bec/devices/pseudo_devices/__init__.py
Normal file
172
csaxs_bec/devices/pseudo_devices/bpm.py
Normal file
172
csaxs_bec/devices/pseudo_devices/bpm.py
Normal file
@@ -0,0 +1,172 @@
|
||||
"""Module for a BPM pseudo device that computes the position and intensity from the blade signals."""
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Kind, Signal
|
||||
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
|
||||
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
|
||||
|
||||
|
||||
class BPM(PSIPseudoDeviceBase):
|
||||
"""BPM positioner pseudo device."""
|
||||
|
||||
# Blade signals, a,b,c,d
|
||||
left_top = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="left_top",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM left_top blade",
|
||||
)
|
||||
right_top = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="right_top",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM right_top blade",
|
||||
)
|
||||
right_bot = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="right_bot",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM right_bottom blade",
|
||||
)
|
||||
left_bot = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="left_bot",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM left_bot blade",
|
||||
)
|
||||
|
||||
# Virtual signals
|
||||
pos_x = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="pos_x",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM X position, -1 fully left, 1 fully right",
|
||||
)
|
||||
pos_y = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="pos_y",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM Y position, -1 fully bottom, 1 fully top",
|
||||
)
|
||||
diagonal = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="diagonal",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM diagonal, -1 fully diagonal left_top-right_bot, 1 fully diagonal right_top-left_bot",
|
||||
)
|
||||
intensity = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="intensity",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM intensity",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name,
|
||||
left_top: str,
|
||||
right_top: str,
|
||||
right_bot: str,
|
||||
left_bot: str,
|
||||
device_manager=None,
|
||||
scan_info=None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
|
||||
# Get all blade signal objects from utility method
|
||||
signal_t = self.left_top.get_device_object_from_bec(
|
||||
object_name=left_top, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
signal_r = self.right_top.get_device_object_from_bec(
|
||||
object_name=right_top, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
signal_b = self.right_bot.get_device_object_from_bec(
|
||||
object_name=right_bot, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
signal_l = self.left_bot.get_device_object_from_bec(
|
||||
object_name=left_bot, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
|
||||
# Set compute methods for blade signals and virtual signals
|
||||
self.left_top.set_compute_method(self._compute_blade_signal, signal=signal_t)
|
||||
self.right_top.set_compute_method(self._compute_blade_signal, signal=signal_r)
|
||||
self.right_bot.set_compute_method(self._compute_blade_signal, signal=signal_b)
|
||||
self.left_bot.set_compute_method(self._compute_blade_signal, signal=signal_l)
|
||||
|
||||
self.intensity.set_compute_method(
|
||||
self._compute_intensity,
|
||||
left_top=self.left_top,
|
||||
right_top=self.right_top,
|
||||
right_bot=self.right_bot,
|
||||
left_bot=self.left_bot,
|
||||
)
|
||||
self.pos_x.set_compute_method(
|
||||
self._compute_pos_x,
|
||||
left_bot=self.left_bot,
|
||||
left_top=self.left_top,
|
||||
right_top=self.right_top,
|
||||
right_bot=self.right_bot,
|
||||
)
|
||||
self.pos_y.set_compute_method(
|
||||
self._compute_pos_y,
|
||||
left_bot=self.left_bot,
|
||||
left_top=self.left_top,
|
||||
right_top=self.right_top,
|
||||
right_bot=self.right_bot,
|
||||
)
|
||||
self.diagonal.set_compute_method(
|
||||
self._compute_diagonal,
|
||||
left_bot=self.left_bot,
|
||||
left_top=self.left_top,
|
||||
right_top=self.right_top,
|
||||
right_bot=self.right_bot,
|
||||
)
|
||||
|
||||
def _compute_blade_signal(self, signal: Signal) -> float:
|
||||
return signal.get()
|
||||
|
||||
def _compute_intensity(
|
||||
self, left_top: Signal, right_top: Signal, right_bot: Signal, left_bot: Signal
|
||||
) -> float:
|
||||
intensity = left_top.get() + right_top.get() + right_bot.get() + left_bot.get()
|
||||
return intensity
|
||||
|
||||
def _compute_pos_x(
|
||||
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
|
||||
) -> float:
|
||||
"""X position from -1 to 1, where -1 means beam fully on the left side, 1 means beam fully on the right side."""
|
||||
sum_left = left_bot.get() + left_top.get()
|
||||
sum_right = right_top.get() + right_bot.get()
|
||||
sum_total = sum_left + sum_right
|
||||
if sum_total == 0:
|
||||
return 0.0
|
||||
return (sum_right - sum_left) / sum_total
|
||||
|
||||
def _compute_pos_y(
|
||||
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
|
||||
) -> float:
|
||||
"""Y position from -1 to 1, where -1 means beam fully on the bottom side, 1 means beam fully on the top side."""
|
||||
sum_top = left_top.get() + right_top.get()
|
||||
sum_bot = right_bot.get() + left_bot.get()
|
||||
sum_total = sum_top + sum_bot
|
||||
if sum_total == 0:
|
||||
return 0.0
|
||||
return (sum_top - sum_bot) / sum_total
|
||||
|
||||
def _compute_diagonal(
|
||||
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
|
||||
) -> float:
|
||||
sum_diag1 = left_bot.get() + right_top.get()
|
||||
sum_diag2 = left_top.get() + right_bot.get()
|
||||
sum_total = sum_diag1 + sum_diag2
|
||||
if sum_total == 0:
|
||||
return 0.0
|
||||
return (sum_diag1 - sum_diag2) / sum_total
|
||||
189
csaxs_bec/devices/pseudo_devices/bpm_control.py
Normal file
189
csaxs_bec/devices/pseudo_devices/bpm_control.py
Normal file
@@ -0,0 +1,189 @@
|
||||
"""
|
||||
Module for controlling the BPM amplifier settings, such as gain and coupling.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Kind
|
||||
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
|
||||
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
|
||||
from ophyd import Signal
|
||||
|
||||
_GAIN_BITS_LOW_NOISE: dict[tuple, int] = {
|
||||
(0, 0, 0): int(1e3),
|
||||
(0, 0, 1): int(1e4),
|
||||
(0, 1, 0): int(1e5),
|
||||
(0, 1, 1): int(1e6),
|
||||
(1, 0, 0): int(1e7),
|
||||
(1, 0, 1): int(1e8),
|
||||
(1, 1, 0): int(1e9),
|
||||
}
|
||||
|
||||
_GAIN_BITS_HIGH_SPEED: dict[tuple, int] = {
|
||||
(0, 0, 0): int(1e5),
|
||||
(0, 0, 1): int(1e6),
|
||||
(0, 1, 0): int(1e7),
|
||||
(0, 1, 1): int(1e8),
|
||||
(1, 0, 0): int(1e9),
|
||||
(1, 0, 1): int(1e10),
|
||||
(1, 1, 0): int(1e11),
|
||||
}
|
||||
|
||||
_GAIN_TO_BITS: dict[int, tuple] = {}
|
||||
for _bits, _gain in _GAIN_BITS_LOW_NOISE.items():
|
||||
_GAIN_TO_BITS[_gain] = (*_bits, True)
|
||||
for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items():
|
||||
if _gain not in _GAIN_TO_BITS: # low-noise takes priority
|
||||
_GAIN_TO_BITS[_gain] = (*_bits, False)
|
||||
|
||||
VALID_GAINS = sorted(_GAIN_TO_BITS.keys())
|
||||
|
||||
|
||||
class BPMControl(PSIPseudoDeviceBase):
|
||||
"""
|
||||
BPM amplifier control pseudo device. It is responsible for controlling the
|
||||
gain and coupling for the BPM amplifier. It relies on signals from a device
|
||||
in BEC to be available. For cSAXS, these are most liikely to be from the
|
||||
GalilRIO device that controls the BPM amplifier.
|
||||
|
||||
Args:
|
||||
name (str): Name of the pseudo device.
|
||||
gain_lsb (str): Name of the signal in BEC that controls the LSB
|
||||
of the gain setting.
|
||||
gain_mid (str): Name of the signal in BEC that controls the MID
|
||||
bit of the gain setting.
|
||||
gain_msb (str): Name of the signal in BEC that controls the MSB
|
||||
of the gain setting.
|
||||
coupling (str): Name of the signal in BEC that controls the coupling
|
||||
setting.
|
||||
speed_mode (str): Name of the signal in BEC that controls the speed mode
|
||||
(low-noise vs high-speed) of the amplifier.
|
||||
"""
|
||||
|
||||
USER_ACCESS = ["set_gain", "set_coupling"]
|
||||
|
||||
gain = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="gain",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="Gain of the amplifier",
|
||||
)
|
||||
coupling = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="coupling",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="Coupling of the amplifier",
|
||||
)
|
||||
speed = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="speed",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="Speed of the amplifier",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
gain_lsb: str,
|
||||
gain_mid: str,
|
||||
gain_msb: str,
|
||||
coupling: str,
|
||||
speed_mode: str,
|
||||
device_manager: DeviceManagerDS | None = None,
|
||||
scan_info: ScanInfo | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
|
||||
|
||||
# First we get all signal objects from BEC using the utility method provided by the BECProcessedSignal class.
|
||||
self._gain_lsb = self.gain.get_device_object_from_bec(
|
||||
object_name=gain_lsb, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
self._gain_mid = self.gain.get_device_object_from_bec(
|
||||
object_name=gain_mid, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
self._gain_msb = self.gain.get_device_object_from_bec(
|
||||
object_name=gain_msb, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
self._coupling = self.gain.get_device_object_from_bec(
|
||||
object_name=coupling, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
self._speed_mode = self.gain.get_device_object_from_bec(
|
||||
object_name=speed_mode, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
|
||||
# Set the compute methods for the virtual signals.
|
||||
self.gain.set_compute_method(
|
||||
self._compute_gain,
|
||||
msb=self._gain_msb,
|
||||
mid=self._gain_mid,
|
||||
lsb=self._gain_lsb,
|
||||
speed_mode=self._speed_mode,
|
||||
)
|
||||
self.coupling.set_compute_method(self._compute_coupling, coupling=self._coupling)
|
||||
self.speed.set_compute_method(self._compute_speed, speed=self._speed_mode)
|
||||
|
||||
def set_gain(
|
||||
self,
|
||||
gain: Literal[
|
||||
1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000, 100000000000
|
||||
],
|
||||
) -> None:
|
||||
"""
|
||||
Set the gain of the amplifier.
|
||||
|
||||
Args:
|
||||
gain (Literal): Must be one of 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000.
|
||||
"""
|
||||
gain_int = int(gain)
|
||||
if gain_int not in VALID_GAINS:
|
||||
raise ValueError(
|
||||
f"{self.name} received invalid gain {gain_int}, must be in {VALID_GAINS}"
|
||||
)
|
||||
|
||||
msb, mid, lsb, use_low_noise = _GAIN_TO_BITS[gain_int]
|
||||
|
||||
self._gain_msb.set(bool(msb)).wait(timeout=2)
|
||||
self._gain_lsb.set(bool(lsb)).wait(timeout=2)
|
||||
self._gain_mid.set(bool(mid)).wait(timeout=2)
|
||||
self._speed_mode.set(bool(use_low_noise))
|
||||
|
||||
def set_coupling(self, coupling: Literal["AC", "DC"]) -> None:
|
||||
"""
|
||||
Set the coupling of the amplifier.
|
||||
|
||||
Args:
|
||||
coupling (Literal): Must be either "AC" or "DC".
|
||||
"""
|
||||
if coupling not in ["AC", "DC"]:
|
||||
raise ValueError(
|
||||
f"{self.name} received invalid coupling value {coupling}, please use 'AC' or 'DC'"
|
||||
)
|
||||
self._coupling.set(coupling == "DC").wait(timeout=2)
|
||||
|
||||
def _compute_gain(self, msb: Signal, mid: Signal, lsb: Signal, speed_mode: Signal) -> int:
|
||||
"""Compute the gain based on the bits and speed mode."""
|
||||
bits = (msb.get(), mid.get(), lsb.get())
|
||||
speed_mode = speed_mode.get()
|
||||
if speed_mode:
|
||||
return _GAIN_BITS_LOW_NOISE.get(bits)
|
||||
else:
|
||||
return _GAIN_BITS_HIGH_SPEED.get(bits)
|
||||
|
||||
def _compute_coupling(self, coupling: Signal) -> str:
|
||||
"""Compute the coupling based on the signal."""
|
||||
return "DC" if coupling.get() else "AC"
|
||||
|
||||
def _compute_speed(self, speed: Signal) -> str:
|
||||
"""Compute the speed based on the signal."""
|
||||
return "low_speed" if speed.get() else "high_speed"
|
||||
1
csaxs_bec/devices/pseudo_devices/dlpca200_settings.py
Normal file
1
csaxs_bec/devices/pseudo_devices/dlpca200_settings.py
Normal file
@@ -0,0 +1 @@
|
||||
# from ophyd
|
||||
41
csaxs_bec/devices/pseudo_devices/signal_forwarder.py
Normal file
41
csaxs_bec/devices/pseudo_devices/signal_forwarder.py
Normal file
@@ -0,0 +1,41 @@
|
||||
"""
|
||||
Pseudo device that forwards a single BEC signal 1:1.
|
||||
"""
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Kind, Signal
|
||||
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
|
||||
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
|
||||
|
||||
|
||||
class SignalForwarder(PSIPseudoDeviceBase):
|
||||
"""Forward one signal unchanged."""
|
||||
|
||||
signal = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="signal",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="Forwarded signal",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name,
|
||||
signal: str,
|
||||
device_manager=None,
|
||||
scan_info=None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
|
||||
|
||||
src = self.signal.get_device_object_from_bec(
|
||||
object_name=signal,
|
||||
signal_name=self.name,
|
||||
device_manager=device_manager,
|
||||
)
|
||||
|
||||
self.signal.set_compute_method(self._compute_signal, signal=src)
|
||||
|
||||
def _compute_signal(self, signal: Signal) -> float:
|
||||
return signal.get()
|
||||
@@ -1 +1 @@
|
||||
from .csaxs_nexus import NeXus_format as cSAXS_NeXus_format
|
||||
from .csaxs_nexus import cSAXSNeXusFormat
|
||||
|
||||
@@ -1,445 +1,470 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import numpy as np
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_lib.devicemanager import DeviceManagerBase
|
||||
from bec_server.file_writer.file_writer import HDF5Storage
|
||||
from bec_server.file_writer.default_writer import DefaultFormat
|
||||
|
||||
|
||||
def get_entry(data: dict, name: str, default=None) -> Any:
|
||||
class cSAXSNeXusFormat(DefaultFormat):
|
||||
"""
|
||||
Get an entry from the scan data assuming a <device>.<device>.value structure.
|
||||
NeXus file format for the cSAXS beamline (BEC era).
|
||||
|
||||
Args:
|
||||
data (dict): Scan data
|
||||
name (str): Entry name
|
||||
default (Any, optional): Default value. Defaults to None.
|
||||
Mirrors the old SPEC layout.xml hierarchy and adds the flOMNI instrument
|
||||
group for the nano-positioning stage used in ptychography.
|
||||
|
||||
Device resilience
|
||||
-----------------
|
||||
Every device read (self.get_entry / device call) is wrapped in try/except.
|
||||
If a device is removed from the BEC config file between sessions it simply
|
||||
disappears from the device_manager — the corresponding dataset or link is
|
||||
silently omitted from the HDF5 file without raising an error. This means
|
||||
the file structure is additive: re-add the device to the config and the
|
||||
field reappears automatically on the next scan.
|
||||
|
||||
Top-level HDF5 structure
|
||||
────────────────────────
|
||||
/entry NXentry (definition = NXptycho)
|
||||
/sample NXsample ← primary sample group
|
||||
/entry_ptycho NXentry ← generic ptycho entry
|
||||
/data_soft NXentry ← convenience Eiger frame links
|
||||
/control NXmonitor
|
||||
/instrument NXinstrument
|
||||
/source
|
||||
/insertion_device
|
||||
/monochromator
|
||||
/XBPM3
|
||||
/slit_3 … slit_5
|
||||
/filter_set
|
||||
/beam_stop_1 … beam_stop_2
|
||||
/eiger_1_5 NXdetector
|
||||
/mcs NXdetector
|
||||
/flOMNI NXpositioner
|
||||
|
||||
Device name mapping (old SPEC → current BEC)
|
||||
────────────────────────────────────────────
|
||||
samx / samy → samx / samy (generic; kept for non-flOMNI configs)
|
||||
sl3wh/wv/ch/cv → sl3trxi/o/b/t (individual blade motors; gap/centre TODO)
|
||||
sl4wh/wv/ch/cv → sl4trxi/o/b/t
|
||||
sl5wh/wv/ch/cv → sl5trxi/o/b/t
|
||||
bs1x / bs1y → bs1x / bs1y
|
||||
bs2x / bs2y → bs2x / bs2y
|
||||
dettrx → dettrx
|
||||
eiger_4 → eiger_1_5
|
||||
mcs → mcs
|
||||
filter_array → filter_array_1_x … filter_array_4_x
|
||||
xbpm3 → xbpm3x / xbpm3y (stage positions; signal readouts TODO)
|
||||
energy → ccm_energy
|
||||
|
||||
TODO (devices not yet in BEC list)
|
||||
───────────────────────────────────
|
||||
curr, idgap ring current, undulator gap
|
||||
moth1, mobd monochromator crystal angles
|
||||
mith, mibd, mirror_coating mirror
|
||||
bpm3s/x/y/z XBPM3 signal readouts
|
||||
sl0 / sl1 / sl2 upstream optics-hutch slits
|
||||
slit gap / centre derived from blade pairs + calibration offset
|
||||
"""
|
||||
if isinstance(data.get(name), list) and isinstance(data.get(name)[0], dict):
|
||||
return [sub_data.get(name, {}).get("value", default) for sub_data in data.get(name)]
|
||||
|
||||
return data.get(name, {}).get(name, {}).get("value", default)
|
||||
# -------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
def _safe_dataset(self, group, name: str, device: str,
|
||||
units: str | None = None,
|
||||
description: str | None = None) -> None:
|
||||
"""
|
||||
Write a dataset from the BEC scan data dictionary.
|
||||
Silently skips if the device was not recorded in this scan
|
||||
(e.g. removed from config, readoutPriority=on_request and not triggered,
|
||||
or the scan finished before the device responded).
|
||||
"""
|
||||
try:
|
||||
value = self.get_entry(device)
|
||||
ds = group.create_dataset(name, data=value)
|
||||
if units:
|
||||
ds.attrs["units"] = units
|
||||
if description:
|
||||
ds.attrs["description"] = description
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _safe_soft_link(self, group, name: str, target: str) -> None:
|
||||
"""Create a soft link; silently skip on any error."""
|
||||
try:
|
||||
group.create_soft_link(name, target)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _slit_blades(self, group, prefix: str) -> None:
|
||||
"""
|
||||
Store individual blade motor positions for a 4-blade slit set.
|
||||
Derived quantities (gap, centre) require a per-slit calibration offset
|
||||
and will be added in a later update.
|
||||
"""
|
||||
for blade, motor in [
|
||||
("inner_x", f"{prefix}trxi"),
|
||||
("outer_x", f"{prefix}trxo"),
|
||||
("bottom_y", f"{prefix}trxb"),
|
||||
("top_y", f"{prefix}trxt"),
|
||||
]:
|
||||
self._safe_dataset(group, blade, motor, units="mm")
|
||||
|
||||
|
||||
def NeXus_format(
|
||||
storage: HDF5Storage, data: dict, file_references: dict, device_manager: DeviceManagerBase
|
||||
) -> HDF5Storage:
|
||||
"""
|
||||
Prepare the NeXus file format.
|
||||
# -------------------------------------------------------------------------
|
||||
# Main format method
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
Args:
|
||||
storage (HDF5Storage): HDF5 storage. Pseudo hdf5 file container that will be written to disk later.
|
||||
data (dict): scan data
|
||||
file_references (dict): File references. Can be used to add external files to the HDF5 file. The path is given relative to the HDF5 file.
|
||||
device_manager (DeviceManagerBase): Device manager. Can be used to check if devices are available.
|
||||
def format(self) -> None:
|
||||
"""Build the NeXus/HDF5 layout for a cSAXS scan."""
|
||||
|
||||
Returns:
|
||||
HDF5Storage: Updated HDF5 storage
|
||||
"""
|
||||
# /entry
|
||||
entry = storage.create_group("entry")
|
||||
entry.attrs["NX_class"] = "NXentry"
|
||||
entry.attrs["definition"] = "NXsas"
|
||||
entry.attrs["start_time"] = data.get("start_time")
|
||||
entry.attrs["end_time"] = data.get("end_time")
|
||||
entry.attrs["version"] = 1.0
|
||||
# Canonical paths referenced by multiple groups
|
||||
RT_POS_PATH = "/entry/instrument/flOMNI/rt_positions"
|
||||
EIGER_COLL = "/entry/collection/file_references/eiger_1_5"
|
||||
|
||||
# /entry/collection
|
||||
collection = entry.create_group("collection")
|
||||
collection.attrs["NX_class"] = "NXcollection"
|
||||
bec_collection = collection.create_group("bec")
|
||||
# ── Root entry ────────────────────────────────────────────────────────
|
||||
entry = self.storage.create_group("entry")
|
||||
entry.attrs["NX_class"] = "NXentry"
|
||||
entry.attrs["definition"] = "NXptycho"
|
||||
|
||||
# /entry/control
|
||||
control = entry.create_group("control")
|
||||
control.attrs["NX_class"] = "NXmonitor"
|
||||
control.create_dataset(name="mode", data="monitor")
|
||||
control.create_dataset(name="integral", data=get_entry(data, "bpm4i"))
|
||||
# ── /entry/sample ─────────────────────────────────────────────────────
|
||||
# Primary sample group. Contains the name of the mounted sample and a
|
||||
# link to the real-time scan positions. Generic samx/samy are recorded
|
||||
# here so the group is meaningful for non-flOMNI configurations too.
|
||||
sample = entry.create_group("sample")
|
||||
sample.attrs["NX_class"] = "NXsample"
|
||||
# Soft-link name directly to the value BEC recorded in the collection.
|
||||
# Only written when flomni_samples is present; other configs leave name absent.
|
||||
if "flomni_samples" in self.device_manager.devices:
|
||||
self._safe_soft_link(
|
||||
sample, "name",
|
||||
"/entry/collection/devices/flomni_samples"
|
||||
"/flomni_samples_sample_names_sample0/value",
|
||||
)
|
||||
# Generic coarse stage positions (meaningful in non-flOMNI setups)
|
||||
self._safe_dataset(sample, "x_translation", "samx", units="mm")
|
||||
self._safe_dataset(sample, "y_translation", "samy", units="mm")
|
||||
# Real-time encoder positions — the primary scan coordinate
|
||||
self._safe_soft_link(sample, "positions", RT_POS_PATH)
|
||||
|
||||
# /entry/data
|
||||
main_data = entry.create_group("data")
|
||||
main_data.attrs["NX_class"] = "NXdata"
|
||||
if "eiger_4" in device_manager.devices:
|
||||
main_data.create_soft_link(name="data", target="/entry/instrument/eiger_4/data")
|
||||
elif "eiger9m" in device_manager.devices:
|
||||
main_data.create_soft_link(name="data", target="/entry/instrument/eiger9m/data")
|
||||
elif "pilatus_2" in device_manager.devices:
|
||||
main_data.create_soft_link(name="data", target="/entry/instrument/pilatus_2/data")
|
||||
# ── /entry/entry_ptycho ───────────────────────────────────────────────
|
||||
# Generic ptychography entry. Detector data and scan positions are
|
||||
# linked in from the instrument groups so this entry is self-contained
|
||||
# for downstream reconstruction codes.
|
||||
entry_ptycho = entry.create_group("entry_ptycho")
|
||||
entry_ptycho.attrs["NX_class"] = "NXentry"
|
||||
entry_ptycho.attrs["definition"] = "NXptycho"
|
||||
|
||||
# /entry/sample
|
||||
control = entry.create_group("sample")
|
||||
control.attrs["NX_class"] = "NXsample"
|
||||
control.create_dataset(name="name", data=get_entry(data, "samplename"))
|
||||
control.create_dataset(name="description", data=data.get("sample_description"))
|
||||
x_translation = control.create_dataset(name="x_translation", data=get_entry(data, "samx"))
|
||||
x_translation.attrs["units"] = "mm"
|
||||
y_translation = control.create_dataset(name="y_translation", data=get_entry(data, "samy"))
|
||||
y_translation.attrs["units"] = "mm"
|
||||
temperature_log = control.create_dataset(name="temperature_log", data=get_entry(data, "temp"))
|
||||
temperature_log.attrs["units"] = "K"
|
||||
nxdata = entry_ptycho.create_group("data")
|
||||
nxdata.attrs["NX_class"] = "NXdata"
|
||||
nxdata.attrs["signal"] = "data"
|
||||
# Detector frames
|
||||
try:
|
||||
for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys():
|
||||
self._safe_soft_link(nxdata, k, f"{EIGER_COLL}/{k}")
|
||||
except Exception:
|
||||
pass
|
||||
# Scan positions
|
||||
self._safe_soft_link(nxdata, "positions", RT_POS_PATH)
|
||||
|
||||
# /entry/instrument
|
||||
instrument = entry.create_group("instrument")
|
||||
instrument.attrs["NX_class"] = "NXinstrument"
|
||||
instrument.create_dataset(name="name", data="cSAXS beamline")
|
||||
# Link to the primary sample group
|
||||
self._safe_soft_link(entry_ptycho, "sample", "/entry/sample")
|
||||
|
||||
source = instrument.create_group("source")
|
||||
source.attrs["NX_class"] = "NXsource"
|
||||
source.create_dataset(name="type", data="Synchrotron X-ray Source")
|
||||
source.create_dataset(name="name", data="Swiss Light Source")
|
||||
source.create_dataset(name="probe", data="x-ray")
|
||||
distance = source.create_dataset(
|
||||
name="distance", data=-33800 - np.asarray(get_entry(data, "samz", 0))
|
||||
)
|
||||
distance.attrs["units"] = "mm"
|
||||
sigma_x = source.create_dataset(name="sigma_x", data=0.202)
|
||||
sigma_x.attrs["units"] = "mm"
|
||||
sigma_y = source.create_dataset(name="sigma_y", data=0.018)
|
||||
sigma_y.attrs["units"] = "mm"
|
||||
divergence_x = source.create_dataset(name="divergence_x", data=0.000135)
|
||||
divergence_x.attrs["units"] = "radians"
|
||||
divergence_y = source.create_dataset(name="divergence_y", data=0.000025)
|
||||
divergence_y.attrs["units"] = "radians"
|
||||
current = source.create_dataset(name="current", data=get_entry(data, "curr"))
|
||||
current.attrs["units"] = "mA"
|
||||
# ── /entry/data_soft ──────────────────────────────────────────────────
|
||||
# Convenience group mirroring the old /entry/data hardlink from layout.xml.
|
||||
data_soft = entry.create_group("data_soft")
|
||||
data_soft.attrs["NX_class"] = "NXentry"
|
||||
try:
|
||||
for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys():
|
||||
self._safe_soft_link(data_soft, k, f"{EIGER_COLL}/{k}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
insertion_device = instrument.create_group("insertion_device")
|
||||
insertion_device.attrs["NX_class"] = "NXinsertion_device"
|
||||
source.create_dataset(name="type", data="undulator")
|
||||
gap = source.create_dataset(name="gap", data=get_entry(data, "idgap"))
|
||||
gap.attrs["units"] = "mm"
|
||||
k = source.create_dataset(name="k", data=2.46)
|
||||
k.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
length = source.create_dataset(name="length", data=1820)
|
||||
length.attrs["units"] = "mm"
|
||||
# ── /entry/control ────────────────────────────────────────────────────
|
||||
control = entry.create_group("control")
|
||||
control.attrs["NX_class"] = "NXmonitor"
|
||||
control.create_dataset("mode", data="monitor")
|
||||
# TODO: beam intensity integral — add device when available
|
||||
# self._safe_dataset(control, "integral", "bpm_sum", units="NX_DIMENSIONLESS")
|
||||
|
||||
slit_0 = instrument.create_group("slit_0")
|
||||
slit_0.attrs["NX_class"] = "NXslit"
|
||||
source.create_dataset(name="material", data="OFHC Cu")
|
||||
source.create_dataset(name="description", data="Horizontal secondary source slit")
|
||||
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl0wh"))
|
||||
x_gap.attrs["units"] = "mm"
|
||||
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl0ch"))
|
||||
x_translation.attrs["units"] = "mm"
|
||||
distance = source.create_dataset(
|
||||
name="distance", data=-21700 - np.asarray(get_entry(data, "samz", 0))
|
||||
)
|
||||
distance.attrs["units"] = "mm"
|
||||
# ── /entry/instrument ─────────────────────────────────────────────────
|
||||
instrument = entry.create_group("instrument")
|
||||
instrument.attrs["NX_class"] = "NXinstrument"
|
||||
instrument.create_dataset("name", data="cSAXS beamline")
|
||||
|
||||
slit_1 = instrument.create_group("slit_1")
|
||||
slit_1.attrs["NX_class"] = "NXslit"
|
||||
source.create_dataset(name="material", data="OFHC Cu")
|
||||
source.create_dataset(name="description", data="Horizontal secondary source slit")
|
||||
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl1wh"))
|
||||
x_gap.attrs["units"] = "mm"
|
||||
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl1wv"))
|
||||
y_gap.attrs["units"] = "mm"
|
||||
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl1ch"))
|
||||
x_translation.attrs["units"] = "mm"
|
||||
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl1ch"))
|
||||
height.attrs["units"] = "mm"
|
||||
distance = source.create_dataset(
|
||||
name="distance", data=-7800 - np.asarray(get_entry(data, "samz", 0))
|
||||
)
|
||||
distance.attrs["units"] = "mm"
|
||||
# ── Source ────────────────────────────────────────────────────────────
|
||||
# Numerical values are currently unknown and stored as 0.
|
||||
# Will be updated once the corresponding devices are in BEC.
|
||||
source = instrument.create_group("source")
|
||||
source.attrs["NX_class"] = "NXsource"
|
||||
source.create_dataset("type", data="Synchrotron X-ray Source")
|
||||
source.create_dataset("name", data="Swiss Light Source")
|
||||
source.create_dataset("probe", data="x-ray")
|
||||
source.create_dataset("sigma_x", data=0.0).attrs["units"] = "mm"
|
||||
source.create_dataset("sigma_y", data=0.0).attrs["units"] = "mm"
|
||||
source.create_dataset("divergence_x", data=0.0).attrs["units"] = "radians"
|
||||
source.create_dataset("divergence_y", data=0.0).attrs["units"] = "radians"
|
||||
# TODO: current — add device when available
|
||||
# self._safe_dataset(source, "current", "curr", units="mA")
|
||||
|
||||
mono = instrument.create_group("monochromator")
|
||||
mono.attrs["NX_class"] = "NXmonochromator"
|
||||
mokev = data.get("mokev", {})
|
||||
if mokev:
|
||||
if isinstance(mokev, list):
|
||||
mokev = mokev[0]
|
||||
wavelength = mono.create_dataset(
|
||||
name="wavelength", data=12.3984193 / (mokev.get("mokev").get("value") + 1e-9)
|
||||
# ── Insertion device ──────────────────────────────────────────────────
|
||||
insertion_device = instrument.create_group("insertion_device")
|
||||
insertion_device.attrs["NX_class"] = "NXinsertion_device"
|
||||
insertion_device.create_dataset("type", data="undulator")
|
||||
insertion_device.create_dataset("k", data=0.0)
|
||||
insertion_device.create_dataset("length", data=0.0).attrs["units"] = "mm"
|
||||
# TODO: gap — add device when available
|
||||
# self._safe_dataset(insertion_device, "gap", "idgap", units="mm")
|
||||
|
||||
# ── Monochromator ─────────────────────────────────────────────────────
|
||||
# ccm_energy is a baseline device and is recorded in the scan data.
|
||||
mono = instrument.create_group("monochromator")
|
||||
mono.attrs["NX_class"] = "NXmonochromator"
|
||||
mono.create_dataset("type", data="Double crystal fixed exit monochromator.")
|
||||
try:
|
||||
energy_kev = self.get_entry("ccm_energy")
|
||||
energy_arr = np.asarray(energy_kev, dtype=float)
|
||||
en_ds = mono.create_dataset("energy", data=energy_arr)
|
||||
en_ds.attrs["units"] = "keV"
|
||||
with np.errstate(divide="ignore", invalid="ignore"):
|
||||
wavelength = np.where(energy_arr != 0, 12.3984193 / energy_arr, 0.0)
|
||||
wl_ds = mono.create_dataset("wavelength", data=wavelength)
|
||||
wl_ds.attrs["units"] = "Angstrom"
|
||||
except Exception:
|
||||
pass
|
||||
# TODO: crystal angles — add moth1 / mobd when available
|
||||
# crystal_1 = mono.create_group("crystal_1")
|
||||
# crystal_1.attrs["NX_class"] = "NXcrystal"
|
||||
# crystal_1.create_dataset("usage", data="Bragg")
|
||||
# crystal_1.create_dataset("type", data="Si")
|
||||
# crystal_1.create_dataset("order_no", data=1.0)
|
||||
# crystal_1.create_dataset("reflection", data="[1 1 1]")
|
||||
# self._safe_dataset(crystal_1, "bragg_angle", "moth1", units="degrees")
|
||||
# crystal_2 = mono.create_group("crystal_2")
|
||||
# crystal_2.attrs["NX_class"] = "NXcrystal"
|
||||
# crystal_2.create_dataset("usage", data="Bragg")
|
||||
# crystal_2.create_dataset("type", data="Si")
|
||||
# crystal_2.create_dataset("order_no", data=2.0)
|
||||
# crystal_2.create_dataset("reflection", data="[1 1 1]")
|
||||
# self._safe_dataset(crystal_2, "bragg_angle", "moth1", units="degrees")
|
||||
# self._safe_dataset(crystal_2, "bend_x", "mobd", units="degrees")
|
||||
|
||||
# ── Mirror ────────────────────────────────────────────────────────────
|
||||
# TODO: mith, mibd, mirror_coating not yet in device list
|
||||
# mirror = instrument.create_group("mirror")
|
||||
# mirror.attrs["NX_class"] = "NXmirror"
|
||||
# mirror.create_dataset("type", data="single")
|
||||
# mirror.create_dataset(
|
||||
# "description",
|
||||
# data=(
|
||||
# "Grazing incidence mirror to reject high-harmonic wavelengths. "
|
||||
# "Three coating options: no coating (SiO2), rhodium (Rh), platinum (Pt)."
|
||||
# ),
|
||||
# )
|
||||
# mirror.create_dataset("substrate_material", data="SiO2")
|
||||
# self._safe_dataset(mirror, "incident_angle", "mith", units="degrees")
|
||||
# self._safe_dataset(mirror, "coating_material", "mirror_coating", units="NX_CHAR")
|
||||
# self._safe_dataset(mirror, "bend_y", "mibd", units="NX_DIMENSIONLESS")
|
||||
|
||||
# ── Upstream slits (optics hutch) ─────────────────────────────────────
|
||||
# TODO: slit_0 / slit_1 / slit_2 motors not yet in BEC device list
|
||||
# slit_0 = instrument.create_group("slit_0")
|
||||
# ...
|
||||
# slit_1 = instrument.create_group("slit_1")
|
||||
# ...
|
||||
# slit_2 = instrument.create_group("slit_2")
|
||||
# ...
|
||||
|
||||
# ── XBPM3 ─────────────────────────────────────────────────────────────
|
||||
# xbpm3x/xbpm3y are stage motor positions for aligning the monitor.
|
||||
# Signal readouts (sum/x/y/skew) are TODO once MCS channels are mapped.
|
||||
xbpm3 = instrument.create_group("XBPM3")
|
||||
xbpm3.attrs["NX_class"] = "NXdetector"
|
||||
xbpm3.attrs["description"] = "X-ray beam position monitor 3, experimental hutch"
|
||||
self._safe_dataset(xbpm3, "x_stage", "xbpm3x", units="mm",
|
||||
description="XBPM3 stage x-translation")
|
||||
self._safe_dataset(xbpm3, "y_stage", "xbpm3y", units="mm",
|
||||
description="XBPM3 stage y-translation")
|
||||
# TODO: signal readout sub-groups once MCS channels are configured
|
||||
# for suffix, entry_name, desc in [
|
||||
# ("sum", "bpm3s", "Sum of counts for the four quadrants."),
|
||||
# ("x", "bpm3x", "Normalized diff, left vs right quadrants."),
|
||||
# ("y", "bpm3y", "Normalized diff, high vs low quadrants."),
|
||||
# ("skew", "bpm3z", "Normalized diff, diagonal quadrants."),
|
||||
# ]:
|
||||
# g = xbpm3.create_group(f"XBPM3_{suffix}")
|
||||
# self._safe_dataset(g, "data", entry_name, units="NX_DIMENSIONLESS")
|
||||
# g.create_dataset("description", data=desc)
|
||||
|
||||
# ── Slit 3 (experimental hutch, exposure box) ─────────────────────────
|
||||
slit_3 = instrument.create_group("slit_3")
|
||||
slit_3.attrs["NX_class"] = "NXslit"
|
||||
slit_3.create_dataset("material", data="Si")
|
||||
slit_3.create_dataset("description", data="Slit 3, experimental hutch, exposure box")
|
||||
# TODO: gap / centre require per-slit calibration offset — add later
|
||||
self._slit_blades(slit_3, "sl3")
|
||||
|
||||
# ── Filter set ────────────────────────────────────────────────────────
|
||||
filter_set = instrument.create_group("filter_set")
|
||||
filter_set.attrs["NX_class"] = "NXattenuator"
|
||||
filter_set.create_dataset("material", data="Si")
|
||||
filter_set.create_dataset(
|
||||
"description",
|
||||
data=(
|
||||
"Four linear filter stages (filter_array_1_x … filter_array_4_x). "
|
||||
"Each stage has five filter positions plus an 'out' position."
|
||||
),
|
||||
)
|
||||
wavelength.attrs["units"] = "Angstrom"
|
||||
energy = mono.create_dataset(name="energy", data=mokev.get("mokev").get("value"))
|
||||
energy.attrs["units"] = "keV"
|
||||
mono.create_dataset(name="type", data="Double crystal fixed exit monochromator.")
|
||||
distance = mono.create_dataset(
|
||||
name="distance", data=-5220 - np.asarray(get_entry(data, "samz", 0))
|
||||
)
|
||||
distance.attrs["units"] = "mm"
|
||||
for i in range(1, 5):
|
||||
self._safe_dataset(filter_set, f"stage_{i}_x",
|
||||
f"filter_array_{i}_x", units="mm")
|
||||
# TODO: attenuator_transmission = 10^(ftrans) once device is available
|
||||
|
||||
crystal_1 = mono.create_group("crystal_1")
|
||||
crystal_1.attrs["NX_class"] = "NXcrystal"
|
||||
crystal_1.create_dataset(name="usage", data="Bragg")
|
||||
crystal_1.create_dataset(name="order_no", data="1")
|
||||
crystal_1.create_dataset(name="reflection", data="[1 1 1]")
|
||||
bragg_angle = crystal_1.create_dataset(name="bragg_angle", data=get_entry(data, "moth1"))
|
||||
bragg_angle.attrs["units"] = "degrees"
|
||||
# ── Slit 4 (experimental hutch, exposure box) ─────────────────────────
|
||||
slit_4 = instrument.create_group("slit_4")
|
||||
slit_4.attrs["NX_class"] = "NXslit"
|
||||
slit_4.create_dataset("material", data="Ge")
|
||||
slit_4.create_dataset("description", data="Slit 4, experimental hutch, exposure box")
|
||||
self._slit_blades(slit_4, "sl4")
|
||||
|
||||
crystal_2 = mono.create_group("crystal_2")
|
||||
crystal_2.attrs["NX_class"] = "NXcrystal"
|
||||
crystal_2.create_dataset(name="usage", data="Bragg")
|
||||
crystal_2.create_dataset(name="order_no", data="2")
|
||||
crystal_2.create_dataset(name="reflection", data="[1 1 1]")
|
||||
bragg_angle = crystal_2.create_dataset(name="bragg_angle", data=get_entry(data, "moth1"))
|
||||
bragg_angle.attrs["units"] = "degrees"
|
||||
bend_x = crystal_2.create_dataset(name="bend_x", data=get_entry(data, "mobd"))
|
||||
bend_x.attrs["units"] = "degrees"
|
||||
# ── Slit 5 (experimental hutch, exposure box) ─────────────────────────
|
||||
slit_5 = instrument.create_group("slit_5")
|
||||
slit_5.attrs["NX_class"] = "NXslit"
|
||||
slit_5.create_dataset("material", data="Si")
|
||||
slit_5.create_dataset("description", data="Slit 5, experimental hutch, exposure box")
|
||||
self._slit_blades(slit_5, "sl5")
|
||||
|
||||
xbpm4 = instrument.create_group("XBPM4")
|
||||
xbpm4.attrs["NX_class"] = "NXdetector"
|
||||
xbpm4_sum = xbpm4.create_group("XBPM4_sum")
|
||||
xbpm4_sum_data = xbpm4_sum.create_dataset(name="data", data=get_entry(data, "bpm4s"))
|
||||
xbpm4_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
xbpm4_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
|
||||
xbpm4_x = xbpm4.create_group("XBPM4_x")
|
||||
xbpm4_x_data = xbpm4_x.create_dataset(name="data", data=get_entry(data, "bpm4x"))
|
||||
xbpm4_x_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
xbpm4_x.create_dataset(
|
||||
name="description", data="Normalized difference of counts between left and right quadrants."
|
||||
)
|
||||
xbpm4_y = xbpm4.create_group("XBPM4_y")
|
||||
xbpm4_y_data = xbpm4_y.create_dataset(name="data", data=get_entry(data, "bpm4y"))
|
||||
xbpm4_y_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
xbpm4_y.create_dataset(
|
||||
name="description", data="Normalized difference of counts between high and low quadrants."
|
||||
)
|
||||
xbpm4_skew = xbpm4.create_group("XBPM4_skew")
|
||||
xbpm4_skew_data = xbpm4_skew.create_dataset(name="data", data=get_entry(data, "bpm4z"))
|
||||
xbpm4_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
xbpm4_skew.create_dataset(
|
||||
name="description", data="Normalized difference of counts between diagonal quadrants."
|
||||
)
|
||||
# ── Beam stop 1 ────────────────────────────────────────────────────────
|
||||
beam_stop_1 = instrument.create_group("beam_stop_1")
|
||||
beam_stop_1.attrs["NX_class"] = "NXbeam_stop"
|
||||
beam_stop_1.create_dataset("description", data="circular")
|
||||
beam_stop_1.create_dataset("size", data=3.0).attrs["units"] = "mm"
|
||||
self._safe_dataset(beam_stop_1, "x", "bs1x", units="mm")
|
||||
self._safe_dataset(beam_stop_1, "y", "bs1y", units="mm")
|
||||
# TODO: diode signal behind beam stop 1 when device is available
|
||||
|
||||
mirror = instrument.create_group("mirror")
|
||||
mirror.attrs["NX_class"] = "NXmirror"
|
||||
mirror.create_dataset(name="type", data="single")
|
||||
mirror.create_dataset(
|
||||
name="description",
|
||||
data="Grazing incidence mirror to reject high-harmonic wavelengths from the monochromator. There are three coating options available that are used depending on the X-ray energy, no coating (SiO2), rhodium (Rh) or platinum (Pt).",
|
||||
)
|
||||
incident_angle = mirror.create_dataset(name="incident_angle", data=get_entry(data, "mith"))
|
||||
incident_angle.attrs["units"] = "degrees"
|
||||
substrate_material = mirror.create_dataset(name="substrate_material", data="SiO2")
|
||||
substrate_material.attrs["units"] = "NX_CHAR"
|
||||
coating_material = mirror.create_dataset(name="coating_material", data="SiO2")
|
||||
coating_material.attrs["units"] = "NX_CHAR"
|
||||
bend_y = mirror.create_dataset(name="bend_y", data="mibd")
|
||||
bend_y.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
distance = mirror.create_dataset(
|
||||
name="distance", data=-4370 - np.asarray(get_entry(data, "samz", 0))
|
||||
)
|
||||
distance.attrs["units"] = "mm"
|
||||
# ── Beam stop 2 ────────────────────────────────────────────────────────
|
||||
beam_stop_2 = instrument.create_group("beam_stop_2")
|
||||
beam_stop_2.attrs["NX_class"] = "NXbeam_stop"
|
||||
beam_stop_2.create_dataset("description", data="rectangular")
|
||||
beam_stop_2.create_dataset("size_x", data=5.0).attrs["units"] = "mm"
|
||||
beam_stop_2.create_dataset("size_y", data=2.25).attrs["units"] = "mm"
|
||||
self._safe_dataset(beam_stop_2, "x", "bs2x", units="mm")
|
||||
self._safe_dataset(beam_stop_2, "y", "bs2y", units="mm")
|
||||
# TODO: diode (transmitted signal) when device is available
|
||||
|
||||
xbpm5 = instrument.create_group("XBPM5")
|
||||
xbpm5.attrs["NX_class"] = "NXdetector"
|
||||
xbpm5_sum = xbpm5.create_group("XBPM5_sum")
|
||||
xbpm5_sum_data = xbpm5_sum.create_dataset(name="data", data=get_entry(data, "bpm5s"))
|
||||
xbpm5_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
xbpm5_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
|
||||
xbpm5_x = xbpm5.create_group("XBPM5_x")
|
||||
xbpm5_x_data = xbpm5_x.create_dataset(name="data", data=get_entry(data, "bpm5x"))
|
||||
xbpm5_x_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
xbpm5_x.create_dataset(
|
||||
name="description", data="Normalized difference of counts between left and right quadrants."
|
||||
)
|
||||
xbpm5_y = xbpm5.create_group("XBPM5_y")
|
||||
xbpm5_y_data = xbpm5_y.create_dataset(name="data", data=get_entry(data, "bpm5y"))
|
||||
xbpm5_y_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
xbpm5_y.create_dataset(
|
||||
name="description", data="Normalized difference of counts between high and low quadrants."
|
||||
)
|
||||
xbpm5_skew = xbpm5.create_group("XBPM5_skew")
|
||||
xbpm5_skew_data = xbpm5_skew.create_dataset(name="data", data=get_entry(data, "bpm5z"))
|
||||
xbpm5_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
xbpm5_skew.create_dataset(
|
||||
name="description", data="Normalized difference of counts between diagonal quadrants."
|
||||
)
|
||||
|
||||
slit_2 = instrument.create_group("slit_2")
|
||||
slit_2.attrs["NX_class"] = "NXslit"
|
||||
source.create_dataset(name="material", data="Ag")
|
||||
source.create_dataset(name="description", data="Slit 2, optics hutch")
|
||||
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl2wh"))
|
||||
x_gap.attrs["units"] = "mm"
|
||||
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl2wv"))
|
||||
y_gap.attrs["units"] = "mm"
|
||||
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl2ch"))
|
||||
x_translation.attrs["units"] = "mm"
|
||||
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl2cv"))
|
||||
height.attrs["units"] = "mm"
|
||||
distance = source.create_dataset(
|
||||
name="distance", data=-3140 - np.asarray(get_entry(data, "samz", 0))
|
||||
)
|
||||
distance.attrs["units"] = "mm"
|
||||
|
||||
slit_3 = instrument.create_group("slit_3")
|
||||
slit_3.attrs["NX_class"] = "NXslit"
|
||||
source.create_dataset(name="material", data="Si")
|
||||
source.create_dataset(name="description", data="Slit 3, experimental hutch, exposure box")
|
||||
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl3wh"))
|
||||
x_gap.attrs["units"] = "mm"
|
||||
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl3wv"))
|
||||
y_gap.attrs["units"] = "mm"
|
||||
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl3ch"))
|
||||
x_translation.attrs["units"] = "mm"
|
||||
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl3cv"))
|
||||
height.attrs["units"] = "mm"
|
||||
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0))
|
||||
# distance.attrs["units"] = "mm"
|
||||
|
||||
filter_set = instrument.create_group("filter_set")
|
||||
filter_set.attrs["NX_class"] = "NXattenuator"
|
||||
filter_set.create_dataset(name="material", data="Si")
|
||||
filter_set.create_dataset(
|
||||
name="description",
|
||||
data="The filter set consists of 4 linear stages, each with five filter positions. Additionally, each one allows for an out position to allow 'no filtering'.",
|
||||
)
|
||||
attenuator_transmission = filter_set.create_dataset(
|
||||
name="attenuator_transmission", data=10 ** get_entry(data, "ftrans", 0)
|
||||
)
|
||||
attenuator_transmission.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
|
||||
slit_4 = instrument.create_group("slit_4")
|
||||
slit_4.attrs["NX_class"] = "NXslit"
|
||||
source.create_dataset(name="material", data="Si")
|
||||
source.create_dataset(name="description", data="Slit 4, experimental hutch, exposure box")
|
||||
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl4wh"))
|
||||
x_gap.attrs["units"] = "mm"
|
||||
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl4wv"))
|
||||
y_gap.attrs["units"] = "mm"
|
||||
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl4ch"))
|
||||
x_translation.attrs["units"] = "mm"
|
||||
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl4cv"))
|
||||
height.attrs["units"] = "mm"
|
||||
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0))
|
||||
# distance.attrs["units"] = "mm"
|
||||
|
||||
slit_5 = instrument.create_group("slit_5")
|
||||
slit_5.attrs["NX_class"] = "NXslit"
|
||||
source.create_dataset(name="material", data="Si")
|
||||
source.create_dataset(name="description", data="Slit 5, experimental hutch, exposure box")
|
||||
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl5wh"))
|
||||
x_gap.attrs["units"] = "mm"
|
||||
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl5wv"))
|
||||
y_gap.attrs["units"] = "mm"
|
||||
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl5ch"))
|
||||
x_translation.attrs["units"] = "mm"
|
||||
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl5cv"))
|
||||
height.attrs["units"] = "mm"
|
||||
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0))
|
||||
# distance.attrs["units"] = "mm"
|
||||
|
||||
beam_stop_1 = instrument.create_group("beam_stop_1")
|
||||
beam_stop_1.attrs["NX_class"] = "NX_beamstop"
|
||||
beam_stop_1.create_dataset(name="description", data="circular")
|
||||
bms1_size = beam_stop_1.create_dataset(name="size", data=3)
|
||||
bms1_size.attrs["units"] = "mm"
|
||||
bms1_x = beam_stop_1.create_dataset(name="size", data=get_entry(data, "bs1x"))
|
||||
bms1_x.attrs["units"] = "mm"
|
||||
bms1_y = beam_stop_1.create_dataset(name="size", data=get_entry(data, "bs1y"))
|
||||
bms1_y.attrs["units"] = "mm"
|
||||
|
||||
beam_stop_2 = instrument.create_group("beam_stop_2")
|
||||
beam_stop_2.attrs["NX_class"] = "NX_beamstop"
|
||||
beam_stop_2.create_dataset(name="description", data="rectangular")
|
||||
bms2_size_x = beam_stop_2.create_dataset(name="size_x", data=5)
|
||||
bms2_size_x.attrs["units"] = "mm"
|
||||
bms2_size_y = beam_stop_2.create_dataset(name="size_y", data=2.25)
|
||||
bms2_size_y.attrs["units"] = "mm"
|
||||
bms2_x = beam_stop_2.create_dataset(name="size", data=get_entry(data, "bs2x"))
|
||||
bms2_x.attrs["units"] = "mm"
|
||||
bms2_y = beam_stop_2.create_dataset(name="size", data=get_entry(data, "bs2y"))
|
||||
bms2_y.attrs["units"] = "mm"
|
||||
bms2_data = beam_stop_2.create_dataset(name="data", data=get_entry(data, "diode"))
|
||||
bms2_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
|
||||
if "eiger1p5m" in device_manager.devices and device_manager.devices.eiger1p5m.enabled:
|
||||
eiger_4 = instrument.create_group("eiger_4")
|
||||
eiger_4.attrs["NX_class"] = "NXdetector"
|
||||
x_pixel_size = eiger_4.create_dataset(name="x_pixel_size", data=75)
|
||||
x_pixel_size.attrs["units"] = "um"
|
||||
y_pixel_size = eiger_4.create_dataset(name="y_pixel_size", data=75)
|
||||
y_pixel_size.attrs["units"] = "um"
|
||||
polar_angle = eiger_4.create_dataset(name="polar_angle", data=0)
|
||||
polar_angle.attrs["units"] = "degrees"
|
||||
azimuthal_angle = eiger_4.create_dataset(name="azimuthal_angle", data=0)
|
||||
azimuthal_angle.attrs["units"] = "degrees"
|
||||
rotation_angle = eiger_4.create_dataset(name="rotation_angle", data=0)
|
||||
rotation_angle.attrs["units"] = "degrees"
|
||||
description = eiger_4.create_dataset(
|
||||
name="description", data="Single-photon counting detector, 320 micron-thick Si chip"
|
||||
)
|
||||
orientation = eiger_4.create_group("orientation")
|
||||
orientation.attrs["description"] = (
|
||||
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
|
||||
)
|
||||
orientation.create_dataset(name="transpose", data=1)
|
||||
orientation.create_dataset(name="rot90", data=3)
|
||||
|
||||
if (
|
||||
"eiger9m" in device_manager.devices
|
||||
and device_manager.devices.eiger9m.enabled
|
||||
and "eiger9m" in file_references
|
||||
):
|
||||
eiger9m = instrument.create_group("eiger9m")
|
||||
eiger9m.attrs["NX_class"] = "NXdetector"
|
||||
x_pixel_size = eiger9m.create_dataset(name="x_pixel_size", data=75)
|
||||
x_pixel_size.attrs["units"] = "um"
|
||||
y_pixel_size = eiger9m.create_dataset(name="y_pixel_size", data=75)
|
||||
y_pixel_size.attrs["units"] = "um"
|
||||
polar_angle = eiger9m.create_dataset(name="polar_angle", data=0)
|
||||
polar_angle.attrs["units"] = "degrees"
|
||||
azimuthal_angle = eiger9m.create_dataset(name="azimuthal_angle", data=0)
|
||||
azimuthal_angle.attrs["units"] = "degrees"
|
||||
rotation_angle = eiger9m.create_dataset(name="rotation_angle", data=0)
|
||||
rotation_angle.attrs["units"] = "degrees"
|
||||
description = eiger9m.create_dataset(
|
||||
name="description", data="Eiger9M detector, in-house developed, Paul Scherrer Institute"
|
||||
)
|
||||
orientation = eiger9m.create_group("orientation")
|
||||
orientation.attrs["description"] = (
|
||||
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
|
||||
)
|
||||
orientation.create_dataset(name="transpose", data=1)
|
||||
orientation.create_dataset(name="rot90", data=3)
|
||||
data = eiger9m.create_ext_link("data", file_references["eiger9m"]["path"], "EG9M/data")
|
||||
status = eiger9m.create_ext_link(
|
||||
"status", file_references["eiger9m"]["path"], "EG9M/status"
|
||||
# ── Detector translation ───────────────────────────────────────────────
|
||||
self._safe_dataset(
|
||||
instrument, "detector_translation_x", "dettrx",
|
||||
units="mm", description="Detector x-translation stage",
|
||||
)
|
||||
|
||||
if (
|
||||
"pilatus_2" in device_manager.devices
|
||||
and device_manager.devices.pilatus_2.enabled
|
||||
and "pilatus_2" in file_references
|
||||
):
|
||||
pilatus_2 = instrument.create_group("pilatus_2")
|
||||
pilatus_2.attrs["NX_class"] = "NXdetector"
|
||||
x_pixel_size = pilatus_2.create_dataset(name="x_pixel_size", data=172)
|
||||
x_pixel_size.attrs["units"] = "um"
|
||||
y_pixel_size = pilatus_2.create_dataset(name="y_pixel_size", data=172)
|
||||
y_pixel_size.attrs["units"] = "um"
|
||||
polar_angle = pilatus_2.create_dataset(name="polar_angle", data=0)
|
||||
polar_angle.attrs["units"] = "degrees"
|
||||
azimuthal_angle = pilatus_2.create_dataset(name="azimuthal_angle", data=0)
|
||||
azimuthal_angle.attrs["units"] = "degrees"
|
||||
rotation_angle = pilatus_2.create_dataset(name="rotation_angle", data=0)
|
||||
rotation_angle.attrs["units"] = "degrees"
|
||||
description = pilatus_2.create_dataset(
|
||||
name="description", data="Pilatus 300K detector, Dectris, Switzerland"
|
||||
)
|
||||
orientation = pilatus_2.create_group("orientation")
|
||||
orientation.attrs["description"] = (
|
||||
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
|
||||
)
|
||||
orientation.create_dataset(name="transpose", data=1)
|
||||
orientation.create_dataset(name="rot90", data=2)
|
||||
data = pilatus_2.create_ext_link(
|
||||
"data", file_references["pilatus_2"]["path"], "entry/instrument/pilatus_2/data"
|
||||
)
|
||||
# ── Eiger 1.5M detector ───────────────────────────────────────────────
|
||||
if (
|
||||
"eiger_1_5" in self.device_manager.devices
|
||||
and self.device_manager.devices.eiger_1_5.enabled
|
||||
and "eiger_1_5" in self.file_references
|
||||
):
|
||||
eiger = instrument.create_group("eiger_1_5")
|
||||
eiger.attrs["NX_class"] = "NXdetector"
|
||||
eiger.create_dataset("x_pixel_size", data=75.0).attrs["units"] = "um"
|
||||
eiger.create_dataset("y_pixel_size", data=75.0).attrs["units"] = "um"
|
||||
eiger.create_dataset("polar_angle", data=0.0).attrs["units"] = "degrees"
|
||||
eiger.create_dataset("azimuthal_angle", data=0.0).attrs["units"] = "degrees"
|
||||
eiger.create_dataset("rotation_angle", data=0.0).attrs["units"] = "degrees"
|
||||
eiger.create_dataset(
|
||||
"description",
|
||||
data="Eiger 1.5M detector, in-house developed, Paul Scherrer Institute",
|
||||
)
|
||||
eiger.create_dataset(
|
||||
"type",
|
||||
data="Single-photon counting detector, 320 micron-thick Si chip",
|
||||
)
|
||||
orientation = eiger.create_group("orientation")
|
||||
orientation.attrs["description"] = (
|
||||
"Orientation defines the number of counterclockwise rotations by 90 deg "
|
||||
"followed by a transposition to reach the 'cameraman orientation', "
|
||||
"looking towards the beam."
|
||||
)
|
||||
orientation.create_dataset("transpose", data=1)
|
||||
orientation.create_dataset("rot90", data=3)
|
||||
# Soft-link recorded frame data from the BEC collection
|
||||
try:
|
||||
for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys():
|
||||
self._safe_soft_link(eiger, k, f"{EIGER_COLL}/{k}")
|
||||
except Exception:
|
||||
pass
|
||||
# External link to pixel mask in the Eiger master file
|
||||
try:
|
||||
eiger.create_ext_link(
|
||||
"pixel_mask",
|
||||
self.file_references["eiger_1_5"].file_path,
|
||||
"/entry/instrument/detector/pixel_mask",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if (
|
||||
"falcon" in device_manager.devices
|
||||
and device_manager.devices.falcon.enabled
|
||||
and "falcon" in file_references
|
||||
):
|
||||
falcon = instrument.create_ext_link(
|
||||
"falcon", file_references["falcon"]["path"], "entry/instrument/FalconX1"
|
||||
)
|
||||
# ── MCS (multi-channel scaler) ─────────────────────────────────────────
|
||||
if (
|
||||
"mcs" in self.device_manager.devices
|
||||
and self.device_manager.devices.mcs.enabled
|
||||
):
|
||||
mcs_group = instrument.create_group("mcs")
|
||||
mcs_group.attrs["NX_class"] = "NXdetector"
|
||||
mcs_group.attrs["description"] = "MCS card cSAXS — multi-channel scaler"
|
||||
self._safe_soft_link(mcs_group, "data", "/entry/collection/devices/mcs")
|
||||
|
||||
return storage
|
||||
# ── flOMNI ────────────────────────────────────────────────────────────
|
||||
# flomni_samples is used as the sentinel for the entire flOMNI setup.
|
||||
# If it is absent from the device_manager (removed from config) the
|
||||
# whole group is omitted. Individual datasets inside are still each
|
||||
# guarded by _safe_dataset / _safe_soft_link in case a specific motor
|
||||
# is temporarily disabled without removing the full setup.
|
||||
if "flomni_samples" in self.device_manager.devices:
|
||||
flomni = instrument.create_group("flOMNI")
|
||||
flomni.attrs["NX_class"] = "NXpositioner"
|
||||
flomni.attrs["description"] = "flOMNI flexible tOMography Nano Imaging"
|
||||
|
||||
# Galil motors — coarse sample stage
|
||||
self._safe_dataset(flomni, "fsamx", "fsamx", units="mm", description="Sample coarse X")
|
||||
self._safe_dataset(flomni, "fsamy", "fsamy", units="mm", description="Sample coarse Y")
|
||||
self._safe_dataset(flomni, "fsamroy", "fsamroy", units="degrees", description="Sample rotation")
|
||||
|
||||
# Galil motors — sample transfer / tray
|
||||
self._safe_dataset(flomni, "ftransx", "ftransx", units="mm", description="Sample transfer X")
|
||||
self._safe_dataset(flomni, "ftransy", "ftransy", units="mm", description="Sample transfer Y")
|
||||
self._safe_dataset(flomni, "ftransz", "ftransz", units="mm", description="Sample transfer Z")
|
||||
self._safe_dataset(flomni, "ftray", "ftray", units="mm", description="Sample transfer tray")
|
||||
|
||||
# Galil motors — laser tracker
|
||||
self._safe_dataset(flomni, "ftracky", "ftracky", units="mm", description="Laser tracker coarse Y")
|
||||
self._safe_dataset(flomni, "ftrackz", "ftrackz", units="mm", description="Laser tracker coarse Z")
|
||||
|
||||
# Galil motors — X-ray eye
|
||||
self._safe_dataset(flomni, "feyex", "feyex", units="mm", description="X-ray eye X")
|
||||
self._safe_dataset(flomni, "feyey", "feyey", units="mm", description="X-ray eye Y")
|
||||
|
||||
# Galil motors — optics (zone plate)
|
||||
self._safe_dataset(flomni, "foptx", "foptx", units="mm", description="Optics X")
|
||||
self._safe_dataset(flomni, "fopty", "fopty", units="mm", description="Optics Y")
|
||||
self._safe_dataset(flomni, "foptz", "foptz", units="mm", description="Optics Z")
|
||||
|
||||
# Galil motor — heater
|
||||
self._safe_dataset(flomni, "fheater", "fheater", units="mm", description="Heater Y")
|
||||
|
||||
# Smaract motors — OSA (order-sorting aperture)
|
||||
self._safe_dataset(flomni, "fosax", "fosax", units="mm", description="OSA X")
|
||||
self._safe_dataset(flomni, "fosay", "fosay", units="mm", description="OSA Y")
|
||||
self._safe_dataset(flomni, "fosaz", "fosaz", units="mm", description="OSA Z")
|
||||
|
||||
# Temperature and humidity sensor (soft link to BEC collection entry)
|
||||
self._safe_soft_link(
|
||||
flomni, "flomni_temphum",
|
||||
"/entry/collection/devices/flomni_temphum",
|
||||
)
|
||||
|
||||
# Real-time encoder positions (RtFlomniFlyer)
|
||||
# Single soft link to the entire rt_positions folder in the BEC
|
||||
# collection. This is the primary scan coordinate for ptychography.
|
||||
self._safe_soft_link(
|
||||
flomni, "rt_positions",
|
||||
"/entry/collection/devices/rt_positions",
|
||||
)
|
||||
|
||||
@@ -27,20 +27,19 @@ from bec_lib import bec_logger, messages
|
||||
from bec_lib.alarm_handler import Alarms
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_server.scan_server.errors import ScanAbortion
|
||||
from bec_server.scan_server.scans import SyncFlyScanBase
|
||||
from bec_server.scan_server.scans import AsyncFlyScanBase
|
||||
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import TRIGGERSOURCE
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class FlomniFermatScan(SyncFlyScanBase):
|
||||
class FlomniFermatScan(AsyncFlyScanBase):
|
||||
scan_name = "flomni_fermat_scan"
|
||||
scan_type = "fly"
|
||||
required_kwargs = ["fovx", "fovy", "exp_time", "step", "angle"]
|
||||
arg_input = {}
|
||||
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
|
||||
use_scan_progress_report = True
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -104,6 +103,14 @@ class FlomniFermatScan(SyncFlyScanBase):
|
||||
self.zshift = -100
|
||||
self.flomni_rotation_status = None
|
||||
|
||||
def scan_report_instructions(self):
|
||||
"""Scan report instructions for the progress bar"""
|
||||
yield from self.stubs.scan_report_instruction({"device_progress": ["rt_positions"]})
|
||||
|
||||
@property
|
||||
def monitor_sync(self) -> str:
|
||||
return "rt_positions"
|
||||
|
||||
def initialize(self):
|
||||
self.scan_motors = []
|
||||
self.update_readout_priority()
|
||||
@@ -113,10 +120,6 @@ class FlomniFermatScan(SyncFlyScanBase):
|
||||
self.positions, corridor_size=self.optim_trajectory_corridor
|
||||
)
|
||||
|
||||
@property
|
||||
def monitor_sync(self):
|
||||
return "rt_flomni"
|
||||
|
||||
def reverse_trajectory(self):
|
||||
"""
|
||||
Reverse the trajectory. Every other scan should be reversed to
|
||||
@@ -290,26 +293,18 @@ class FlomniFermatScan(SyncFlyScanBase):
|
||||
return np.array(positions)
|
||||
|
||||
def scan_core(self):
|
||||
# use a device message to receive the scan number and
|
||||
# scan ID before sending the message to the device server
|
||||
yield from self.stubs.kickoff(device="rtx")
|
||||
while True:
|
||||
yield from self.stubs.read(group="monitored")
|
||||
status = self.connector.get(MessageEndpoints.device_status("rt_scan"))
|
||||
if status:
|
||||
status_id = status.content.get("status", 1)
|
||||
request_id = status.metadata.get("RID")
|
||||
if status_id == 0 and self.metadata.get("RID") == request_id:
|
||||
break
|
||||
if status_id == 2 and self.metadata.get("RID") == request_id:
|
||||
raise ScanAbortion(
|
||||
"An error occured during the flomni readout:"
|
||||
f" {status.metadata.get('error')}"
|
||||
)
|
||||
# send off the flyer
|
||||
yield from self.stubs.kickoff(device="rt_positions")
|
||||
|
||||
# start the readout loop of the flyer
|
||||
status = yield from self.stubs.complete(device="rt_positions", wait=False)
|
||||
|
||||
# read the monitors until the flyer is done
|
||||
while not status.done:
|
||||
yield from self.stubs.read(group="monitored", point_id=self.point_id)
|
||||
self.point_id += 1
|
||||
time.sleep(1)
|
||||
logger.debug("reading monitors")
|
||||
# yield from self.device_rpc("rtx", "controller.kickoff")
|
||||
|
||||
def move_to_start(self):
|
||||
"""return to the start position"""
|
||||
@@ -336,6 +331,7 @@ class FlomniFermatScan(SyncFlyScanBase):
|
||||
yield from self.read_scan_motors()
|
||||
self.prepare_positions()
|
||||
yield from self._prepare_setup()
|
||||
yield from self.scan_report_instructions()
|
||||
yield from self.open_scan()
|
||||
yield from self.stage()
|
||||
yield from self.run_baseline_reading()
|
||||
|
||||
@@ -217,6 +217,16 @@ def test_mcs_card_csaxs_complete_and_stop(mock_mcs_csaxs: MCSCardCSAXS):
|
||||
assert not mcs._start_monitor_async_data_emission.is_set()
|
||||
|
||||
|
||||
def test_mcs_on_stop(mock_mcs_csaxs: MCSCardCSAXS):
|
||||
"""Test that on stop sets the omit_mca_callbacks flag. Also test that on stage clears the omit_mca_callbacks flag."""
|
||||
mcs = mock_mcs_csaxs
|
||||
assert mcs._omit_mca_callbacks.is_set() is False
|
||||
mcs.stop()
|
||||
assert mcs._omit_mca_callbacks.is_set() is True
|
||||
mcs.stage()
|
||||
assert mcs._omit_mca_callbacks.is_set() is False
|
||||
|
||||
|
||||
def test_mcs_recovery(mock_mcs_csaxs: MCSCardCSAXS):
|
||||
mcs = mock_mcs_csaxs
|
||||
# Simulate ongoing acquisition
|
||||
|
||||
69
tests/tests_devices/test_omny_shutter.py
Normal file
69
tests/tests_devices/test_omny_shutter.py
Normal file
@@ -0,0 +1,69 @@
|
||||
import pytest
|
||||
from bec_server.device_server.tests.utils import DMMock
|
||||
|
||||
from csaxs_bec.devices.omny.shutter import MonitorSignal, OMNYFastShutter
|
||||
|
||||
|
||||
@pytest.mark.parametrize("auto_monitor", [False, True])
|
||||
def test_monitor_signal_stores_auto_monitor(auto_monitor):
|
||||
signal = MonitorSignal(name="signal", auto_monitor=auto_monitor)
|
||||
|
||||
assert signal.auto_monitor is auto_monitor
|
||||
|
||||
|
||||
def test_monitor_signal_put_propagates_value_to_readback_callback():
|
||||
signal = MonitorSignal(name="signal", auto_monitor=True)
|
||||
initial_value = signal.read()[signal.name]["value"]
|
||||
callback_values = []
|
||||
callback_reads = []
|
||||
|
||||
def _test_cb(value, old_value, **kwargs):
|
||||
callback_values.append((value, old_value))
|
||||
callback_reads.append(kwargs["obj"].read())
|
||||
|
||||
signal.subscribe(_test_cb, event_type=signal.SUB_VALUE, run=False)
|
||||
|
||||
signal.put(1)
|
||||
|
||||
assert callback_values == [(1, initial_value)]
|
||||
assert len(callback_reads) == 1
|
||||
assert callback_reads[0][signal.name]["value"] == 1
|
||||
assert signal.read()[signal.name]["value"] == 1
|
||||
|
||||
signal.put(0)
|
||||
assert callback_values == [(1, initial_value), (0, 1)]
|
||||
assert len(callback_reads) == 2
|
||||
assert callback_reads[1][signal.name]["value"] == 0
|
||||
assert signal.read()[signal.name]["value"] == 0
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def omny_fast_shutter():
|
||||
shutter = OMNYFastShutter(name="omny_fast_shutter", device_manager=DMMock())
|
||||
|
||||
try:
|
||||
yield shutter
|
||||
finally:
|
||||
shutter.destroy()
|
||||
|
||||
|
||||
def test_omny_fast_shutter_uses_monitor_signal_with_auto_monitor(omny_fast_shutter):
|
||||
assert isinstance(omny_fast_shutter.shutter, MonitorSignal)
|
||||
assert omny_fast_shutter.shutter.auto_monitor is True
|
||||
|
||||
|
||||
def test_omny_fast_shutter_propagates_signal_changes_to_device_readback(omny_fast_shutter):
|
||||
signal_name = omny_fast_shutter.shutter.name
|
||||
callback_reads = []
|
||||
|
||||
def _test_cb(**kwargs):
|
||||
callback_reads.append(omny_fast_shutter.read())
|
||||
|
||||
omny_fast_shutter.shutter.subscribe(_test_cb, event_type=omny_fast_shutter.shutter.SUB_VALUE, run=False)
|
||||
|
||||
omny_fast_shutter.shutter.put(1)
|
||||
|
||||
assert len(callback_reads) == 1
|
||||
assert callback_reads[0][signal_name]["value"] == 1
|
||||
assert omny_fast_shutter.read()[signal_name]["value"] == 1
|
||||
assert omny_fast_shutter.fshstatus() == 1
|
||||
241
tests/tests_devices/test_pseudo_devices.py
Normal file
241
tests/tests_devices/test_pseudo_devices.py
Normal file
@@ -0,0 +1,241 @@
|
||||
"""Module to test the pseudo_device module."""
|
||||
|
||||
import pytest
|
||||
from bec_lib.atlas_models import Device
|
||||
from ophyd_devices.sim.sim_signals import SetableSignal
|
||||
|
||||
from csaxs_bec.devices.pseudo_devices.bpm import BPM
|
||||
from csaxs_bec.devices.pseudo_devices.bpm_control import _GAIN_TO_BITS, BPMControl
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patched_dm(dm_with_devices):
|
||||
# Patch missing current_session attribute in the device manager
|
||||
dm = dm_with_devices
|
||||
setattr(dm, "current_session", dm._session)
|
||||
#
|
||||
signal_lsb = SetableSignal(name="gain_lsb", value=0, kind="config")
|
||||
signal_mid = SetableSignal(name="gain_mid", value=0, kind="config")
|
||||
signal_msb = SetableSignal(name="gain_msb", value=0, kind="config")
|
||||
signal_coupling = SetableSignal(name="coupling", value=0, kind="config")
|
||||
signal_speed = SetableSignal(name="speed_mode", value=0, kind="config")
|
||||
for signal in [signal_lsb, signal_mid, signal_msb, signal_coupling, signal_speed]:
|
||||
dev_cfg = Device(
|
||||
name=signal.name,
|
||||
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
|
||||
enabled=True,
|
||||
readoutPriority="baseline",
|
||||
)
|
||||
dm._session["devices"].append(dev_cfg.model_dump())
|
||||
dm.devices._add_device(signal.name, signal)
|
||||
return dm
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bpm_control(patched_dm):
|
||||
name = "bpm_control"
|
||||
control_config = Device(
|
||||
name=name,
|
||||
deviceClass="csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl",
|
||||
enabled=True,
|
||||
readoutPriority="baseline",
|
||||
deviceConfig={
|
||||
"gain_lsb": "gain_lsb",
|
||||
"gain_mid": "gain_mid",
|
||||
"gain_msb": "gain_msb",
|
||||
"coupling": "coupling",
|
||||
"speed_mode": "speed_mode",
|
||||
},
|
||||
needs=["gain_lsb", "gain_mid", "gain_msb", "coupling", "speed_mode"],
|
||||
)
|
||||
patched_dm._session["devices"].append(control_config.model_dump())
|
||||
try:
|
||||
control = BPMControl(
|
||||
name=name,
|
||||
gain_lsb="gain_lsb",
|
||||
gain_mid="gain_mid",
|
||||
gain_msb="gain_msb",
|
||||
coupling="coupling",
|
||||
speed_mode="speed_mode",
|
||||
device_manager=patched_dm,
|
||||
)
|
||||
patched_dm.devices._add_device(control.name, control)
|
||||
control.wait_for_connection()
|
||||
yield control
|
||||
finally:
|
||||
control.destroy()
|
||||
|
||||
|
||||
def test_bpm_control_set_gain(bpm_control):
|
||||
gain_lsb = bpm_control.device_manager.devices["gain_lsb"]
|
||||
gain_mid = bpm_control.device_manager.devices["gain_mid"]
|
||||
gain_msb = bpm_control.device_manager.devices["gain_msb"]
|
||||
coupling = bpm_control.device_manager.devices["coupling"]
|
||||
speed_mode = bpm_control.device_manager.devices["speed_mode"]
|
||||
gain_lsb.put(0)
|
||||
gain_mid.put(0)
|
||||
gain_msb.put(0)
|
||||
coupling.put(0)
|
||||
speed_mode.put(1)
|
||||
|
||||
gain = bpm_control.gain.get()
|
||||
assert _GAIN_TO_BITS.get(gain) == (0, 0, 0, speed_mode.get() == 1)
|
||||
|
||||
gain_val = 10000000
|
||||
bpm_control.set_gain(gain_val)
|
||||
assert _GAIN_TO_BITS.get(gain_val, ()) == (
|
||||
gain_msb.get(),
|
||||
gain_mid.get(),
|
||||
gain_lsb.get(),
|
||||
speed_mode.get(),
|
||||
)
|
||||
|
||||
gain_val = 100000000000
|
||||
bpm_control.set_gain(gain_val)
|
||||
assert _GAIN_TO_BITS.get(gain_val, ()) == (
|
||||
gain_msb.get(),
|
||||
gain_mid.get(),
|
||||
gain_lsb.get(),
|
||||
speed_mode.get(),
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
bpm_control.set_gain(1005.0)
|
||||
|
||||
|
||||
def test_bpm_control_set_coupling(bpm_control):
|
||||
coupling = bpm_control.device_manager.devices["coupling"]
|
||||
coupling.put(0)
|
||||
|
||||
bpm_control.coupling.get() == "AC"
|
||||
coupling.put(1)
|
||||
bpm_control.coupling.get() == "DC"
|
||||
|
||||
bpm_control.set_coupling("AC")
|
||||
assert coupling.get() == 0
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
bpm_control.set_coupling("wrong")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patched_dm_bpm(dm_with_devices):
|
||||
# Patch missing current_session attribute in the device manager
|
||||
dm = dm_with_devices
|
||||
setattr(dm, "current_session", dm._session)
|
||||
#
|
||||
left_top = SetableSignal(name="left_top", value=0, kind="config")
|
||||
right_top = SetableSignal(name="right_top", value=0, kind="config")
|
||||
right_bot = SetableSignal(name="right_bot", value=0, kind="config")
|
||||
left_bot = SetableSignal(name="left_bot", value=0, kind="config")
|
||||
for signal in [left_top, right_top, right_bot, left_bot]:
|
||||
|
||||
dev_cfg = Device(
|
||||
name=signal.name,
|
||||
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
|
||||
enabled=True,
|
||||
readoutPriority="baseline",
|
||||
)
|
||||
dm._session["devices"].append(dev_cfg.model_dump())
|
||||
dm.devices._add_device(signal.name, signal)
|
||||
return dm
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bpm(patched_dm_bpm):
|
||||
name = "bpm"
|
||||
bpm_config = Device(
|
||||
name=name,
|
||||
deviceClass="csaxs_bec.devices.pseudo_devices.bpm.BPM",
|
||||
enabled=True,
|
||||
readoutPriority="baseline",
|
||||
deviceConfig={
|
||||
"left_top": "left_top",
|
||||
"right_top": "right_top",
|
||||
"right_bot": "right_bot",
|
||||
"left_bot": "left_bot",
|
||||
},
|
||||
needs=["left_top", "right_top", "right_bot", "left_bot"],
|
||||
)
|
||||
patched_dm_bpm._session["devices"].append(bpm_config.model_dump())
|
||||
try:
|
||||
bpm = BPM(
|
||||
name=name,
|
||||
left_top="left_top",
|
||||
right_top="right_top",
|
||||
right_bot="right_bot",
|
||||
left_bot="left_bot",
|
||||
device_manager=patched_dm_bpm,
|
||||
)
|
||||
patched_dm_bpm.devices._add_device(bpm.name, bpm)
|
||||
bpm.wait_for_connection()
|
||||
yield bpm
|
||||
finally:
|
||||
bpm.destroy()
|
||||
|
||||
|
||||
def test_bpm_positions(bpm):
|
||||
left_top = bpm.device_manager.devices["left_top"]
|
||||
right_top = bpm.device_manager.devices["right_top"]
|
||||
right_bot = bpm.device_manager.devices["right_bot"]
|
||||
left_bot = bpm.device_manager.devices["left_bot"]
|
||||
|
||||
# Test center position
|
||||
for signal in [left_top, right_top, right_bot, left_bot]:
|
||||
signal.put(1)
|
||||
assert bpm.pos_x.get() == 0
|
||||
assert bpm.pos_y.get() == 0
|
||||
|
||||
# Test fully left
|
||||
left_top.put(1)
|
||||
right_top.put(0)
|
||||
right_bot.put(0)
|
||||
left_bot.put(1)
|
||||
assert bpm.pos_x.get() == -1
|
||||
assert bpm.pos_y.get() == 0
|
||||
assert bpm.diagonal.get() == 0
|
||||
assert bpm.intensity.get() == 2
|
||||
|
||||
# Test fully right
|
||||
left_top.put(0)
|
||||
right_top.put(1)
|
||||
right_bot.put(1)
|
||||
left_bot.put(0)
|
||||
assert bpm.pos_x.get() == 1
|
||||
assert bpm.pos_y.get() == 0
|
||||
assert bpm.diagonal.get() == 0
|
||||
|
||||
# Test fully top
|
||||
left_top.put(1)
|
||||
right_top.put(1)
|
||||
right_bot.put(0)
|
||||
left_bot.put(0)
|
||||
assert bpm.pos_x.get() == 0
|
||||
assert bpm.pos_y.get() == 1
|
||||
assert bpm.diagonal.get() == 0
|
||||
|
||||
# Test fully bottom
|
||||
left_top.put(0)
|
||||
right_top.put(0)
|
||||
right_bot.put(1)
|
||||
left_bot.put(1)
|
||||
assert bpm.pos_x.get() == 0
|
||||
assert bpm.pos_y.get() == -1
|
||||
assert bpm.diagonal.get() == 0
|
||||
|
||||
# Diagonal beam
|
||||
left_top.put(1)
|
||||
right_top.put(0)
|
||||
right_bot.put(1)
|
||||
left_bot.put(0)
|
||||
assert bpm.pos_x.get() == 0
|
||||
assert bpm.pos_y.get() == 0
|
||||
assert bpm.diagonal.get() == -1
|
||||
|
||||
left_top.put(0)
|
||||
right_top.put(1)
|
||||
right_bot.put(0)
|
||||
left_bot.put(1)
|
||||
assert bpm.pos_x.get() == 0
|
||||
assert bpm.pos_y.get() == 0
|
||||
assert bpm.diagonal.get() == 1
|
||||
Reference in New Issue
Block a user