Compare commits

...

23 Commits

Author SHA1 Message Date
x12sa
6f919ecfdf fixes in contrast and audio confirm logif
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-25 20:20:26 +01:00
x12sa
360fd03a73 webpage version2
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-25 17:46:14 +01:00
x12sa
9e84b8c510 first version of webpage
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m56s
2026-03-24 17:06:08 +01:00
x12sa
88df4781ec tags added
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m54s
2026-03-24 15:36:25 +01:00
x12sa
3b474c89c8 removed write subtomo to scilog 2026-03-24 15:34:36 +01:00
x12sa
68cc13e1d3 alignment scans to scilog 2026-03-24 15:33:14 +01:00
x12sa
700f3f9bb9 scilog tag added 2026-03-24 15:27:25 +01:00
x12sa
15a4d45f68 moved tomo_reconstruct to tomo_scan_projection
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m59s
2026-03-24 15:00:17 +01:00
x12sa
7c7f877d78 new logos for logbook 2026-03-24 14:59:50 +01:00
x12sa
5d61d756c9 logo and scilog newline fixed
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m56s
2026-03-23 17:00:28 +01:00
x12sa
b37ae3ef57 wip message to scilog when tomo starts
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-23 16:29:30 +01:00
x12sa
76ed858e5c added heartbeat, start and remaining time to progress
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-23 15:58:54 +01:00
x12sa
a0555def4d changed progress dict to global variable
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m56s
2026-03-23 15:48:17 +01:00
x12sa
c1ad2fc4c3 pdf status report fixes
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-23 12:38:37 +01:00
x12sa
9eee4ee1f7 minor fixes during testing
All checks were successful
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 2s
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-19 11:17:41 +01:00
c97b00cc8c fix: flomni async readout
All checks were successful
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 2s
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-19 11:13:26 +01:00
d6a4fd37fc fix(mcs): fix _progress_udpate
Some checks failed
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 2s
CI for csaxs_bec / test (push) Has been cancelled
2026-03-19 11:11:54 +01:00
6d4c9d90fc fix(mcs): omit_mca_callbacks if stop is called. 2026-03-19 11:11:54 +01:00
87163cc3f1 docs: run build on py 3-12
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m57s
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 4s
CI for csaxs_bec / test (push) Successful in 1m56s
2026-03-18 14:35:42 +01:00
7c17a3ae40 ci: add rtd workflow
Some checks failed
CI for csaxs_bec / test (push) Has been cancelled
2026-03-18 14:34:52 +01:00
663d22fff4 fix(flomni): indentation error in if statement
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m57s
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-18 14:02:44 +01:00
3ca29dd0dd fix(x_ray_eye): added 20s rpc timeout to most rpc calls 2026-03-18 13:57:08 +01:00
0ac37f538b fix(gui_tools): flomni gui tools timeout optimization 2026-03-18 13:57:06 +01:00
20 changed files with 2096 additions and 330 deletions

View File

@@ -0,0 +1,21 @@
name: Read the Docs Deploy Trigger
on:
push:
branches:
- main
workflow_dispatch:
jobs:
trigger-rtd-webhook:
runs-on: ubuntu-latest
steps:
- name: Trigger Read the Docs webhook
env:
RTD_TOKEN: ${{ secrets.RTD_TOKEN }}
run: |
curl --fail --show-error --silent \
-X POST \
-d "branches=${{ github.ref_name }}" \
-d "token=${RTD_TOKEN}" \
"https://readthedocs.org/api/v2/webhook/sls-csaxs/270162/"

View File

@@ -8,15 +8,14 @@ version: 2
build:
os: ubuntu-20.04
tools:
python: "3.10"
python: "3.12"
jobs:
pre_install:
- pip install .
# Build documentation in the docs/ directory with Sphinx
sphinx:
configuration: docs/conf.py
configuration: docs/conf.py
# If using Sphinx, optionally build your docs in additional formats such as PDF
# formats:
@@ -24,6 +23,5 @@ sphinx:
# Optionally declare the Python requirements required to build your docs
python:
install:
- requirements: docs/requirements.txt
install:
- requirements: docs/requirements.txt

Binary file not shown.

After

Width:  |  Height:  |  Size: 562 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 48 KiB

View File

@@ -0,0 +1,89 @@
"""
LamNI/webpage_generator.py
===========================
LamNI-specific webpage generator subclass.
Integration (inside the LamNI __init__ / startup):
---------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.LamNI.webpage_generator import (
LamniWebpageGenerator,
)
self._webpage_gen = LamniWebpageGenerator(
bec_client=client,
output_dir="~/data/raw/webpage/",
)
self._webpage_gen.start()
Or use the factory (auto-selects by session name "lamni"):
----------------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
make_webpage_generator,
)
self._webpage_gen = make_webpage_generator(bec, output_dir="~/data/raw/webpage/")
self._webpage_gen.start()
Interactive helpers:
--------------------
lamni._webpage_gen.status()
lamni._webpage_gen.verbosity = 2
lamni._webpage_gen.stop()
lamni._webpage_gen.start()
"""
from pathlib import Path
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
WebpageGeneratorBase,
_safe_get,
_safe_float,
_gvar,
)
class LamniWebpageGenerator(WebpageGeneratorBase):
"""
LamNI-specific webpage generator.
Logo: LamNI.png from the same directory as this file.
Override _collect_setup_data() to add LamNI-specific temperatures,
sample name, and measurement settings.
"""
# TODO: fill in LamNI-specific device paths
# label -> dotpath under device_manager.devices
_TEMP_MAP = {
# "Sample": "lamni_temphum.temperature_sample",
# "OSA": "lamni_temphum.temperature_osa",
}
def _logo_path(self):
return Path(__file__).parent / "LamNI.png"
def _collect_setup_data(self) -> dict:
# ── LamNI-specific data goes here ─────────────────────────────
# Uncomment and adapt when device names are known:
#
# dm = self._bec.device_manager
# sample_name = _safe_get(dm, "lamni_samples.sample_names.sample0") or "N/A"
# temperatures = {
# label: _safe_float(_safe_get(dm, path))
# for label, path in self._TEMP_MAP.items()
# }
# settings = {
# "Sample name": sample_name,
# "FOV x / y": ...,
# "Exposure time": _gvar(self._bec, "tomo_countingtime", ".3f", " s"),
# "Angle step": _gvar(self._bec, "tomo_angle_stepsize", ".2f", "\u00b0"),
# }
# return {
# "type": "lamni",
# "sample_name": sample_name,
# "temperatures": temperatures,
# "settings": settings,
# }
# Placeholder — returns minimal info until implemented
return {
"type": "lamni",
# LamNI-specific data here
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 124 KiB

View File

@@ -21,6 +21,14 @@ from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import (
TomoIDManager,
)
# from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
# FlomniWebpageGenerator,
# VERBOSITY_SILENT, # 0 — no output
# VERBOSITY_NORMAL, # 1 — startup / stop messages only (default)
# VERBOSITY_VERBOSE, # 2 — one-line summary per cycle
# VERBOSITY_DEBUG, # 3 — full JSON payload per cycle
# )
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
@@ -28,6 +36,7 @@ if builtins.__dict__.get("bec") is not None:
dev = builtins.__dict__.get("dev")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
@@ -35,12 +44,15 @@ def umv(*args):
class FlomniToolsError(Exception):
pass
class FlomniInitError(Exception):
pass
class FlomniError(Exception):
pass
# class FlomniTools:
# def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
# if autoconfirm and default == "y":
@@ -84,13 +96,16 @@ class FlomniInitStagesMixin:
else:
return
sensor_voltage_target = dev.ftransy.user_parameter.get("sensor_voltage")
sensor_voltage = float(dev.ftransy.controller.socket_put_and_receive("MG@AN[1]").strip())
if not np.isclose(sensor_voltage, sensor_voltage_target, 0.25):
print(f"Sensor voltage of the gripper is {sensor_voltage}, while target from config is {sensor_voltage_target}")
print("Verify that the value is acceptable and update config file. Reload config and start again.")
print(
f"Sensor voltage of the gripper is {sensor_voltage}, while target from config is {sensor_voltage_target}"
)
print(
"Verify that the value is acceptable and update config file. Reload config and start again."
)
return
print("Starting to drive ftransy to +y limit")
@@ -118,8 +133,9 @@ class FlomniInitStagesMixin:
dev.feyex.limits = [-30, -1]
print("done")
if self.OMNYTools.yesno("Init of foptz. Can the stage move to the upstream limit without collision?"):
if self.OMNYTools.yesno(
"Init of foptz. Can the stage move to the upstream limit without collision?"
):
print("OK. continue.")
else:
return
@@ -173,7 +189,9 @@ class FlomniInitStagesMixin:
dev.fsamy.limits = [2, 3.1]
print("done")
if self.OMNYTools.yesno("Init of tracking stages. Did you remove the outer laser flight tubes?"):
if self.OMNYTools.yesno(
"Init of tracking stages. Did you remove the outer laser flight tubes?"
):
print("OK. continue.")
else:
print("Stopping.")
@@ -206,7 +224,9 @@ class FlomniInitStagesMixin:
print("done")
print("Initializing UPR stage.")
if self.OMNYTools.yesno("To ensure that the end switches work, please check that they are currently not pushed. Is everything okay?"):
if self.OMNYTools.yesno(
"To ensure that the end switches work, please check that they are currently not pushed. Is everything okay?"
):
print("OK. continue.")
else:
print("Stopping.")
@@ -246,7 +266,9 @@ class FlomniInitStagesMixin:
dev.fsamroy.limits = [-5, 365]
print("done")
if self.OMNYTools.yesno("Init of foptx. Can the stage move to the positive limit without collision? Attention: tracker flight tube!"):
if self.OMNYTools.yesno(
"Init of foptx. Can the stage move to the positive limit without collision? Attention: tracker flight tube!"
):
print("OK. continue.")
else:
print("Stopping.")
@@ -425,7 +447,7 @@ class FlomniSampleTransferMixin:
def ftransfer_flomni_stage_in(self):
time.sleep(1)
sample_in_position = dev.flomni_samples.is_sample_slot_used(0)
#bool(float(dev.flomni_samples.sample_placed.sample0.get()))
# bool(float(dev.flomni_samples.sample_placed.sample0.get()))
if not sample_in_position:
raise FlomniError("There is no sample in the sample stage. Aborting.")
self.reset_correction()
@@ -440,7 +462,10 @@ class FlomniSampleTransferMixin:
print("Moving X-ray eye in.")
if self.OMNYTools.yesno("Please confirm that this is ok with the flight tube. This check is to be removed after commissioning", "n"):
if self.OMNYTools.yesno(
"Please confirm that this is ok with the flight tube. This check is to be removed after commissioning",
"n",
):
print("OK. continue.")
else:
print("Stopping.")
@@ -451,7 +476,6 @@ class FlomniSampleTransferMixin:
self.foptics_out()
self.xrayeye_update_frame()
def laser_tracker_show_all(self):
dev.rtx.controller.laser_tracker_show_all()
@@ -554,7 +578,6 @@ class FlomniSampleTransferMixin:
self.flomnigui_show_cameras()
self.ftransfer_gripper_move(position)
self.ftransfer_controller_enable_mount_mode()
@@ -594,7 +617,7 @@ class FlomniSampleTransferMixin:
self.check_sensor_connected()
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
#bool(float(dev.flomni_samples.sample_in_gripper.get()))
# bool(float(dev.flomni_samples.sample_in_gripper.get()))
if not sample_in_gripper:
raise FlomniError("The gripper does not carry a sample.")
@@ -646,7 +669,7 @@ class FlomniSampleTransferMixin:
def ftransfer_sample_change(self, new_sample_position: int):
self.check_tray_in()
# sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
# sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
if sample_in_gripper:
raise FlomniError("There is already a sample in the gripper. Aborting.")
@@ -654,28 +677,30 @@ class FlomniSampleTransferMixin:
self.check_position_is_valid(new_sample_position)
if new_sample_position == 0:
raise FlomniError("The new sample to place cannot be the sample in the sample stage. Aborting.")
raise FlomniError(
"The new sample to place cannot be the sample in the sample stage. Aborting."
)
# sample_placed = getattr(
# dev.flomni_samples.sample_placed, f"sample{new_sample_position}"
# ).get()
# sample_placed = getattr(
# dev.flomni_samples.sample_placed, f"sample{new_sample_position}"
# ).get()
sample_placed = dev.flomni_samples.is_sample_slot_used(new_sample_position)
if not sample_placed:
raise FlomniError(
f"There is currently no sample in position [{new_sample_position}]. Aborting."
)
# sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get()
# sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get()
sample_in_sample_stage = dev.flomni_samples.is_sample_slot_used(0)
if sample_in_sample_stage:
# find a new home for the sample...
empty_slots = []
# for name, val in dev.flomni_samples.read().items():
# if "flomni_samples_sample_placed_sample" not in name:
# continue
# if val.get("value") == 0:
# empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1]))
for j in range(1,20):
# for name, val in dev.flomni_samples.read().items():
# if "flomni_samples_sample_placed_sample" not in name:
# continue
# if val.get("value") == 0:
# empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1]))
for j in range(1, 20):
if not dev.flomni_samples.is_sample_slot_used(j):
empty_slots.append(j)
if not empty_slots:
@@ -761,7 +786,9 @@ class FlomniSampleTransferMixin:
dev.ftransy.controller.socket_put_confirmed("confirm=1")
else:
print("Stopping.")
exit
raise FlomniError(
"User abort sample transfer."
)
def ftransfer_gripper_is_open(self) -> bool:
status = bool(float(dev.ftransy.controller.socket_put_and_receive("MG @OUT[9]").strip()))
@@ -769,7 +796,7 @@ class FlomniSampleTransferMixin:
def ftransfer_gripper_open(self):
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
#dev.flomni_samples.sample_in_gripper.get()
# dev.flomni_samples.sample_in_gripper.get()
if sample_in_gripper:
raise FlomniError(
"Cannot open gripper. There is still a sample in the gripper! Aborting."
@@ -784,13 +811,17 @@ class FlomniSampleTransferMixin:
def ftransfer_gripper_move(self, position: int):
self.check_position_is_valid(position)
self._ftransfer_shiftx = -0.2
#this is not used for sample stage position!
self._ftransfer_shiftx = -0.15
self._ftransfer_shiftz = -0.5
fsamx_pos = dev.fsamx.readback.get()
if position == 0 and fsamx_pos > -160:
if self.OMNYTools.yesno("May the flomni stage be moved out for the sample change? Feedback will be disabled and alignment will be lost!", "y"):
if self.OMNYTools.yesno(
"May the flomni stage be moved out for the sample change? Feedback will be disabled and alignment will be lost!",
"y",
):
print("OK. continue.")
self.ftransfer_flomni_stage_out()
else:
@@ -801,7 +832,7 @@ class FlomniSampleTransferMixin:
self.check_tray_in()
if position == 0:
umv(dev.ftransx, 10.715 + 0.2, dev.ftransz, 3.5950)
umv(dev.ftransx, 11, dev.ftransz, 3.5950)
if position == 1:
umv(
dev.ftransx,
@@ -946,8 +977,6 @@ class FlomniSampleTransferMixin:
class FlomniAlignmentMixin:
import csaxs_bec
import os
from pathlib import Path
# Ensure this is a Path object, not a string
csaxs_bec_basepath = Path(csaxs_bec.__file__)
@@ -956,10 +985,13 @@ class FlomniAlignmentMixin:
# Build the absolute path correctly
default_correction_file = (
csaxs_bec_basepath.parent / 'bec_ipython_client' / 'plugins' / 'flomni' / default_correction_file_rel
csaxs_bec_basepath.parent
/ "bec_ipython_client"
/ "plugins"
/ "flomni"
/ default_correction_file_rel
).resolve()
def reset_correction(self, use_default_correction=True):
"""
Reset the correction to the default values.
@@ -1031,12 +1063,12 @@ class FlomniAlignmentMixin:
else:
params = dev.omny_xray_gui.fit_params_x.get()
#amplitude
tomo_alignment_fit[0][0] = params['SineModel_0_amplitude']
#phase
tomo_alignment_fit[0][1] = params['SineModel_0_shift']
#offset
tomo_alignment_fit[0][2] = params['LinearModel_1_intercept']
# amplitude
tomo_alignment_fit[0][0] = params["SineModel_0_amplitude"]
# phase
tomo_alignment_fit[0][1] = params["SineModel_0_shift"]
# offset
tomo_alignment_fit[0][2] = params["LinearModel_1_intercept"]
print("applying vertical default values from mirror calibration, not from fit!")
tomo_alignment_fit[1][0] = 0
tomo_alignment_fit[1][1] = 0
@@ -1045,7 +1077,6 @@ class FlomniAlignmentMixin:
tomo_alignment_fit[1][4] = 0
print("New alignment parameters loaded based on Xray eye alignment GUI:")
print(
f"X Amplitude {tomo_alignment_fit[0][0]}, "
f"X Phase {tomo_alignment_fit[0][1]}, "
@@ -1186,13 +1217,83 @@ class FlomniAlignmentMixin:
return additional_correction_shift
class _ProgressProxy:
"""Dict-like proxy that persists the flOMNI progress dict as a BEC global variable.
Every read (`proxy["key"]`) fetches the current dict from the global var store,
and every write (`proxy["key"] = val`) fetches, updates, and saves it back.
This makes the progress state visible to all BEC client sessions via
``client.get_global_var("tomo_progress")``.
"""
_GLOBAL_VAR_KEY = "tomo_progress"
_DEFAULTS: dict = {
"subtomo": 0,
"subtomo_projection": 0,
"subtomo_total_projections": 1,
"projection": 0,
"total_projections": 1,
"angle": 0,
"tomo_type": 0,
"tomo_start_time": None,
"estimated_remaining_time": None,
"heartbeat": None,
}
def __init__(self, client):
self._client = client
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _load(self) -> dict:
val = self._client.get_global_var(self._GLOBAL_VAR_KEY)
if val is None:
return dict(self._DEFAULTS)
return val
def _save(self, data: dict) -> None:
self._client.set_global_var(self._GLOBAL_VAR_KEY, data)
# ------------------------------------------------------------------
# Dict-like interface
# ------------------------------------------------------------------
def __getitem__(self, key):
return self._load()[key]
def __setitem__(self, key, value) -> None:
data = self._load()
data[key] = value
self._save(data)
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._load()!r})"
def get(self, key, default=None):
return self._load().get(key, default)
def update(self, *args, **kwargs) -> None:
"""Update multiple fields in a single round-trip."""
data = self._load()
data.update(*args, **kwargs)
self._save(data)
def reset(self) -> None:
"""Reset all progress fields to their default values."""
self._save(dict(self._DEFAULTS))
def as_dict(self) -> dict:
"""Return a plain copy of the current progress state."""
return self._load()
class Flomni(
FlomniInitStagesMixin,
FlomniSampleTransferMixin,
FlomniAlignmentMixin,
FlomniOpticsMixin,
cSAXSBeamlineChecks,
flomniGuiTools
flomniGuiTools,
):
def __init__(self, client):
super().__init__()
@@ -1208,14 +1309,17 @@ class Flomni(
self.corr_angle_y = []
self.corr_pos_y_2 = []
self.corr_angle_y_2 = []
self.progress = {}
self.progress["subtomo"] = 0
self.progress["subtomo_projection"] = 0
self.progress["subtomo_total_projections"] = 1
self.progress["projection"] = 0
self.progress["total_projections"] = 1
self.progress["angle"] = 0
self.progress["tomo_type"] = 0
self._progress_proxy = _ProgressProxy(self.client)
self._progress_proxy.reset()
from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_webpage_generator import (
FlomniWebpageGenerator,
)
self._webpage_gen = FlomniWebpageGenerator(
bec_client=client,
output_dir="~/data/raw/webpage/",
)
self._webpage_gen.start()
self.OMNYTools = OMNYTools(self.client)
self.reconstructor = PtychoReconstructor(self.ptycho_reconstruct_foldername)
self.tomo_id_manager = TomoIDManager()
@@ -1224,7 +1328,10 @@ class Flomni(
def start_x_ray_eye_alignment(self, keep_shutter_open=False):
if self.OMNYTools.yesno("Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.", "y"):
if self.OMNYTools.yesno(
"Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.",
"y",
):
self.align = XrayEyeAlign(self.client, self)
try:
self.align.align(keep_shutter_open)
@@ -1236,7 +1343,7 @@ class Flomni(
umv(dev.fsamx, fsamx_in)
raise exc
def xrayeye_update_frame(self,keep_shutter_open=False):
def xrayeye_update_frame(self, keep_shutter_open=False):
self.align.update_frame(keep_shutter_open)
def xrayeye_alignment_start(self, keep_shutter_open=False):
@@ -1268,6 +1375,42 @@ class Flomni(
self.special_angles = []
self.special_angle_repeats = 1
@property
def progress(self) -> _ProgressProxy:
"""Proxy dict backed by the BEC global variable ``tomo_progress``.
Readable from any BEC client session via::
client.get_global_var("tomo_progress")
Individual fields can be read and written just like a regular dict::
flomni.progress["projection"] # read
flomni.progress["projection"] = 42 # write (persists immediately)
To update multiple fields atomically use :py:meth:`_ProgressProxy.update`::
flomni.progress.update(projection=42, angle=90.0)
To reset all fields to their defaults::
flomni.progress.reset()
"""
return self._progress_proxy
@progress.setter
def progress(self, val: dict) -> None:
"""Replace the entire progress dict.
Accepts a plain :class:`dict` and persists it to the global var store.
Example::
flomni.progress = {"projection": 0, "total_projections": 100, ...}
"""
if not isinstance(val, dict):
raise TypeError(f"progress must be a dict, got {type(val).__name__!r}")
self._progress_proxy._save(val)
@property
def tomo_shellstep(self):
val = self.client.get_global_var("tomo_shellstep")
@@ -1454,21 +1597,11 @@ class Flomni(
def sample_name(self):
return self.sample_get_name(0)
def write_to_scilog(self, content, tags: list = None):
try:
if tags is not None:
tags.append("BEC")
else:
tags = ["BEC"]
msg = bec.logbook.LogbookMessage()
msg.add_text(content).add_tag(tags)
self.client.logbook.send_logbook_message(msg)
except Exception:
logger.warning("Failed to write to scilog.")
def tomo_alignment_scan(self):
"""
Performs a tomogram alignment scan.
Collects all scan numbers acquired during the alignment, prints them at the end,
and creates a BEC scilog text entry summarising the alignment scan numbers.
"""
if self.get_alignment_offset(0) == (0, 0, 0):
print("It appears that the xrayeye alignemtn was not performend or loaded. Aborting.")
@@ -1476,14 +1609,11 @@ class Flomni(
dev = builtins.__dict__.get("dev")
bec = builtins.__dict__.get("bec")
self.feye_out()
tags = ["BEC_alignment_tomo", self.sample_name]
self.write_to_scilog(
f"Starting alignment scan. First scan number: {bec.queue.next_scan_number}.", tags
)
start_angle = 0
alignment_scan_numbers = []
angle_end = start_angle + 180
for angle in np.linspace(start_angle, angle_end, num=int(180 / 45) + 1, endpoint=True):
@@ -1495,7 +1625,6 @@ class Flomni(
try:
start_scan_number = bec.queue.next_scan_number
self.tomo_scan_projection(angle)
self.tomo_reconstruct()
error_caught = False
except AlarmBase as exc:
if exc.alarm_type == "TimeoutError":
@@ -1504,28 +1633,33 @@ class Flomni(
error_caught = True
else:
raise exc
#todo here was if blchk success, then setting to success true
# todo here was if blchk success, then setting to success true
successful = True
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, 0)
#self._write_tomo_scan_number(scan_nr, angle, 0)
alignment_scan_numbers.append(scan_nr)
umv(dev.fsamroy, 0)
self.OMNYTools.printgreenbold("\n\nAlignment scan finished. Please run SPEC_ptycho_align and load the new fit.")
def _write_subtomo_to_scilog(self, subtomo_number):
dev = builtins.__dict__.get("dev")
bec = builtins.__dict__.get("bec")
if self.tomo_id > 0:
tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"]
else:
tags = ["BEC_subtomo", self.sample_name]
self.write_to_scilog(
f"Starting subtomo: {subtomo_number}. First scan number: {bec.queue.next_scan_number}.",
tags,
self.OMNYTools.printgreenbold(
"\n\nAlignment scan finished. Please run SPEC_ptycho_align and load the new fit by flomni.read_alignment_offset() ."
)
# summary of alignment scan numbers
scan_list_str = ", ".join(str(s) for s in alignment_scan_numbers)
#print(f"\nAlignment scan numbers ({len(alignment_scan_numbers)} total): {scan_list_str}")
# BEC scilog entry (no logo)
scilog_content = (
f"Alignment scan finished.\n"
f"Sample: {self.sample_name}\n"
f"Number of alignment scans: {len(alignment_scan_numbers)}\n"
f"Alignment scan numbers: {scan_list_str}\n"
)
print(scliog_content)
bec.messaging.scilog.new().add_text(scilog_content.replace("\n", "<br>")).add_tags("alignmentscan").send()
def sub_tomo_scan(self, subtomo_number, start_angle=None):
"""
Performs a sub tomogram scan.
@@ -1533,18 +1667,6 @@ class Flomni(
subtomo_number (int): The sub tomogram number.
start_angle (float, optional): The start angle of the scan. Defaults to None.
"""
# dev = builtins.__dict__.get("dev")
# bec = builtins.__dict__.get("bec")
# if self.tomo_id > 0:
# tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"]
# else:
# tags = ["BEC_subtomo", self.sample_name]
# self.write_to_scilog(
# f"Starting subtomo: {subtomo_number}. First scan number: {bec.queue.next_scan_number}.",
# tags,
# )
self._write_subtomo_to_scilog(subtomo_number)
if start_angle is not None:
print(f"Sub tomo scan with start angle {start_angle} requested.")
@@ -1567,20 +1689,19 @@ class Flomni(
elif subtomo_number == 8:
start_angle = self.tomo_angle_stepsize / 8.0 * 7
# _tomo_shift_angles (potential global variable)
_tomo_shift_angles = 0
# compute number of projections
start = start_angle + _tomo_shift_angles
if subtomo_number % 2: # odd = forward
if subtomo_number % 2: # odd = forward
max_allowed_angle = 180.05 + self.tomo_angle_stepsize
proposed_end = start + 180
angle_end = min(proposed_end, max_allowed_angle)
span = angle_end - start
else: # even = reverse
else: # even = reverse
min_allowed_angle = 0
proposed_end = start - 180
angle_end = max(proposed_end, min_allowed_angle)
@@ -1589,38 +1710,23 @@ class Flomni(
# number of projections needed to maintain step size
N = int(span / self.tomo_angle_stepsize) + 1
angles = np.linspace(
start,
angle_end,
num=N,
endpoint=True,
)
angles = np.linspace(start, angle_end, num=N, endpoint=True)
if subtomo_number % 2: # odd subtomos → forward direction
if subtomo_number % 2: # odd subtomos → forward direction
# clamp end angle to max allowed
max_allowed_angle = 180.05 + self.tomo_angle_stepsize
proposed_end = start + 180
angle_end = min(proposed_end, max_allowed_angle)
angles = np.linspace(
start,
angle_end,
num=N,
endpoint=True,
)
angles = np.linspace(start, angle_end, num=N, endpoint=True)
else: # even subtomos → reverse direction
else: # even subtomos → reverse direction
# go FROM start_angle down toward 0
min_allowed_angle = 0
proposed_end = start - 180
angle_end = max(proposed_end, min_allowed_angle)
angles = np.linspace(
start,
angle_end,
num=N,
endpoint=True,
)
angles = np.linspace(start, angle_end, num=N, endpoint=True)
for i, angle in enumerate(angles):
@@ -1636,9 +1742,9 @@ class Flomni(
self._subtomo_offset = 0
else:
if subtomo_number % 2: # odd = forward direction
if subtomo_number % 2: # odd = forward direction
self._subtomo_offset = round(sa / step)
else: # even = reverse direction
else: # even = reverse direction
self._subtomo_offset = round((180 - sa) / step)
# progress index must always increase
@@ -1647,22 +1753,24 @@ class Flomni(
# existing progress fields
self.progress["subtomo_total_projections"] = int(180 / self.tomo_angle_stepsize)
self.progress["projection"] = (subtomo_number - 1) * self.progress["subtomo_total_projections"] + self.progress["subtomo_projection"]
self.progress["projection"] = (subtomo_number - 1) * self.progress[
"subtomo_total_projections"
] + self.progress["subtomo_projection"]
self.progress["total_projections"] = 180 / self.tomo_angle_stepsize * 8
self.progress["angle"] = angle
# finally do the scan at this angle
self._tomo_scan_at_angle(angle, subtomo_number)
def _tomo_scan_at_angle(self, angle, subtomo_number):
successful = False
error_caught = False
if 0 <= angle < 180.05:
self.progress["heartbeat"] = datetime.datetime.now().isoformat()
print(f"Starting flOMNI scan for angle {angle} in subtomo {subtomo_number}")
self._print_progress()
while not successful:
#self.bl_chk._bl_chk_start()
# self.bl_chk._bl_chk_start()
if not self.special_angles:
self._current_special_angles = []
if self._current_special_angles:
@@ -1697,12 +1805,14 @@ class Flomni(
"""start a tomo scan"""
if not self._check_eye_out_and_optics_in():
print("Attention: The setup is not in measurement condition.\nXray eye might be IN or the Xray optics OUT.")
print(
"Attention: The setup is not in measurement condition.\nXray eye might be IN or the Xray optics OUT."
)
if self.OMNYTools.yesno("Shall I continue?", "n"):
print("OK")
else:
print("Stopping.")
return
print("OK")
else:
print("Stopping.")
return
self.flomnigui_show_progress()
@@ -1730,6 +1840,8 @@ class Flomni(
# self.write_pdf_report()
# else:
self.tomo_id = 0
self.write_pdf_report()
self.progress["tomo_start_time"] = datetime.datetime.now().isoformat()
with scans.dataset_id_on_hold:
if self.tomo_type == 1:
@@ -1738,7 +1850,7 @@ class Flomni(
for ii in range(subtomo_start, 9):
self.sub_tomo_scan(ii, start_angle=start_angle)
start_angle = None
elif self.tomo_type == 2:
# Golden ratio tomography
previous_subtomo_number = -1
@@ -1749,7 +1861,6 @@ class Flomni(
while True:
angle, subtomo_number = self._golden(ii, self.golden_ratio_bunch_size, 180, 1)
if previous_subtomo_number != subtomo_number:
self._write_subtomo_to_scilog(subtomo_number)
if (
subtomo_number % 2 == 1
and ii > 10
@@ -1797,7 +1908,6 @@ class Flomni(
ii, int(180 / self.tomo_angle_stepsize), 180, 1, 0
)
if previous_subtomo_number != subtomo_number:
self._write_subtomo_to_scilog(subtomo_number)
if (
subtomo_number % 2 == 1
and ii > 10
@@ -1834,19 +1944,47 @@ class Flomni(
else:
raise FlomniError("undefined tomo type")
self.progress['projection'] = self.progress['total_projections']
self.progress["projection"] = self.progress["total_projections"]
self.progress["subtomo_projection"] = self.progress["subtomo_total_projections"]
self._print_progress()
self._print_progress()
self.OMNYTools.printgreenbold("Tomoscan finished")
@staticmethod
def _format_duration(seconds: float) -> str:
"""Format a duration in seconds as a human-readable string, e.g. '2h 03m 15s'."""
seconds = int(seconds)
h, remainder = divmod(seconds, 3600)
m, s = divmod(remainder, 60)
if h > 0:
return f"{h}h {m:02d}m {s:02d}s"
if m > 0:
return f"{m}m {s:02d}s"
return f"{s}s"
def _print_progress(self):
# --- compute and store estimated remaining time -----------------------
start_str = self.progress.get("tomo_start_time")
projection = self.progress["projection"]
total = self.progress["total_projections"]
if start_str is not None and total > 0 and projection > 9:
elapsed = (
datetime.datetime.now() - datetime.datetime.fromisoformat(start_str)
).total_seconds()
rate = projection / elapsed # projections per second
remaining_s = (total - projection) / rate
self.progress["estimated_remaining_time"] = remaining_s
eta_str = self._format_duration(remaining_s)
else:
eta_str = "N/A"
# ----------------------------------------------------------------------
print("\x1b[95mProgress report:")
print(f"Tomo type: ....................... {self.progress['tomo_type']}")
print(f"Projection: ...................... {self.progress['projection']:.0f}")
print(f"Total projections expected ....... {self.progress['total_projections']}")
print(f"Angle: ........................... {self.progress['angle']}")
print(f"Current subtomo: ................. {self.progress['subtomo']}")
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}\x1b[0m")
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}")
print(f"Estimated remaining time: ........ {eta_str}\x1b[0m")
self._flomnigui_update_progress()
def add_sample_database(
@@ -1870,7 +2008,6 @@ class Flomni(
return
self.tomo_scan_projection(angle)
self.tomo_reconstruct()
def _golden(self, ii, howmany_sorted, maxangle, reverse=False):
"""returns the iis golden ratio angle of sorted bunches of howmany_sorted and its subtomo number"""
@@ -1930,9 +2067,7 @@ class Flomni(
)
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
tomo_scan_numbers_file = os.path.expanduser(
"~/data/raw/logs/tomography_scannumbers.txt"
)
tomo_scan_numbers_file = os.path.expanduser("~/data/raw/logs/tomography_scannumbers.txt")
with open(tomo_scan_numbers_file, "a+") as out_file:
# pylint: disable=undefined-variable
out_file.write(
@@ -1943,7 +2078,6 @@ class Flomni(
dev.rtx.controller.laser_tracker_check_signalstrength()
scans = builtins.__dict__.get("scans")
# additional_correction = self.align.compute_additional_correction(angle)
@@ -1978,7 +2112,7 @@ class Flomni(
f"{str(datetime.datetime.now())}: flomni scan projection at angle {angle}, scan"
f" number {bec.queue.next_scan_number}.\n"
)
# self.write_to_scilog(log_message, ["BEC_scans", self.sample_name])
scans.flomni_fermat_scan(
fovx=self.fovx,
fovy=self.fovy,
@@ -1991,6 +2125,9 @@ class Flomni(
corridor_size=corridor_size,
)
self.tomo_reconstruct()
def tomo_parameters(self):
"""print and update the tomo parameters"""
print("Current settings:")
@@ -2033,7 +2170,6 @@ class Flomni(
)
print(f"\nSample name: {self.sample_name}\n")
if self.OMNYTools.yesno("Are these parameters correctly set for your scan?", "y"):
print("... excellent!")
else:
@@ -2130,19 +2266,21 @@ class Flomni(
+ ' 888 888 "Y88888P" 888 888 888 Y888 8888888 \n'
)
padding = 20
fovxy = f"{self.fovx:.2f}/{self.fovy:.2f}"
stitching = f"{self.stitch_x:.2f}/{self.stitch_y:.2f}"
fovxy = f"{self.fovx:.1f}/{self.fovy:.1f}"
stitching = f"{self.stitch_x:.0f}/{self.stitch_y:.0f}"
dataset_id = str(self.client.queue.next_dataset_number)
account = bec.active_account
content = [
f"{'Sample Name:':<{padding}}{self.sample_name:>{padding}}\n",
f"{'Measurement ID:':<{padding}}{str(self.tomo_id):>{padding}}\n",
f"{'Dataset ID:':<{padding}}{dataset_id:>{padding}}\n",
f"{'Sample Info:':<{padding}}{'Sample Info':>{padding}}\n",
f"{'e-account:':<{padding}}{str(self.client.username):>{padding}}\n",
f"{'e-account:':<{padding}}{str(account):>{padding}}\n",
f"{'Number of projections:':<{padding}}{int(180 / self.tomo_angle_stepsize * 8):>{padding}}\n",
f"{'First scan number:':<{padding}}{self.client.queue.next_scan_number:>{padding}}\n",
f"{'Last scan number approx.:':<{padding}}{self.client.queue.next_scan_number + int(180 / self.tomo_angle_stepsize * 8) + 10:>{padding}}\n",
f"{'Current photon energy:':<{padding}}{dev.mokev.read()['mokev']['value']:>{padding}.4f}\n",
f"{'Current photon energy:':<{padding}}To be implemented\n",
#f"{'Current photon energy:':<{padding}}{dev.mokev.read()['mokev']['value']:>{padding}.4f}\n",
f"{'Exposure time:':<{padding}}{self.tomo_countingtime:>{padding}.2f}\n",
f"{'Fermat spiral step size:':<{padding}}{self.tomo_shellstep:>{padding}.2f}\n",
f"{'FOV:':<{padding}}{fovxy:>{padding}}\n",
@@ -2151,20 +2289,38 @@ class Flomni(
f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n",
]
content = "".join(content)
user_target = os.path.expanduser(f"~/Data10/documentation/tomo_scan_ID_{self.tomo_id}.pdf")
user_target = os.path.expanduser(f"~/data/raw/documentation/tomo_scan_ID_{self.tomo_id}.pdf")
with PDFWriter(user_target) as file:
file.write(header)
file.write(content)
subprocess.run(
"xterm /work/sls/spec/local/XOMNY/bin/upload/upload_last_pon.sh &", shell=True
)
# subprocess.run(
# "xterm /work/sls/spec/local/XOMNY/bin/upload/upload_last_pon.sh &", shell=True
# )
# status = subprocess.run(f"cp /tmp/spec-e20131-specES1.pdf {user_target}", shell=True)
msg = bec.logbook.LogbookMessage()
logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LamNI_logo.png")
msg.add_file(logo_path).add_text("".join(content).replace("\n", "</p><p>")).add_tag(
["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "LamNI", self.sample_name]
)
self.client.logbook.send_logbook_message(msg)
# msg = bec.tomo_progress.tomo_progressMessage()
# logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LamNI_logo.png")
# msg.add_file(logo_path).add_text("".join(content).replace("\n", "</p><p>")).add_tag(
# ["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "flOMNI", self.sample_name]
# )
# self.client.tomo_progress.send_tomo_progress_message("~/data/raw/documentation/tomo_scan_ID_{self.tomo_id}.pdf").send()
import csaxs_bec
# Ensure this is a Path object, not a string
csaxs_bec_basepath = Path(csaxs_bec.__file__)
logo_file_rel = "flOMNI.png"
# Build the absolute path correctly
logo_file = (
csaxs_bec_basepath.parent
/ "bec_ipython_client"
/ "plugins"
/ "flomni"
/ logo_file_rel
).resolve()
print(logo_file)
bec.messaging.scilog.new().add_attachment(logo_file).add_text(content.replace("\n", "<br>")).add_tags("tomoscan").send()
if __name__ == "__main__":
@@ -2181,4 +2337,4 @@ if __name__ == "__main__":
builtins.__dict__["bec"] = bec
builtins.__dict__["umv"] = umv
flomni = Flomni(bec)
flomni.start_x_ray_eye_alignment()
flomni.start_x_ray_eye_alignment()

View File

@@ -50,8 +50,6 @@ class FlomniOpticsMixin:
# move both axes to the desired "in" positions
umv(dev.feyex, feyex_in, dev.feyey, feyey_in)
self.xrayeye_update_frame()
def _ffzp_in(self):
foptx_in = self._get_user_param_safe("foptx", "in")
fopty_in = self._get_user_param_safe("fopty", "in")

File diff suppressed because it is too large Load Diff

View File

@@ -18,10 +18,17 @@ class flomniGuiToolsError(Exception):
class flomniGuiTools:
GUI_RPC_TIMEOUT = 20
def __init__(self):
self.text_box = None
self.progressbar = None
self.flomni_window = None
self.xeyegui = None
self.pdf_viewer = None
self.idle_text_box = None
self.camera_gripper_image = None
self.camera_overview_image = None
def set_client(self, client):
self.client = client
@@ -29,9 +36,10 @@ class flomniGuiTools:
def flomnigui_show_gui(self):
if "flomni" in self.gui.windows:
self.flomni_window = self.gui.windows["flomni"]
self.gui.flomni.raise_window()
else:
self.gui.new("flomni")
self.flomni_window = self.gui.new("flomni", timeout=self.GUI_RPC_TIMEOUT)
time.sleep(1)
def flomnigui_stop_gui(self):
@@ -42,9 +50,11 @@ class flomniGuiTools:
def flomnigui_show_xeyealign(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("xeyegui"):
if self._flomnigui_is_missing("xeyegui"):
self.flomnigui_remove_all_docks()
self.xeyegui = self.gui.flomni.new("XRayEye", object_name="xrayeye")
self.xeyegui = self.gui.flomni.new(
"XRayEye", object_name="xrayeye", timeout=self.GUI_RPC_TIMEOUT
)
# start live
if not dev.cam_xeye.live_mode_enabled.get():
dev.cam_xeye.live_mode_enabled.put(True)
@@ -52,9 +62,11 @@ class flomniGuiTools:
def flomnigui_show_xeyealign_fittab(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("xeyegui"):
if self._flomnigui_is_missing("xeyegui"):
self.flomnigui_remove_all_docks()
self.xeyegui = self.gui.flomni.new("XRayEye")
self.xeyegui = self.gui.flomni.new(
"XRayEye", object_name="xrayeye", timeout=self.GUI_RPC_TIMEOUT
)
self.xeyegui.switch_tab("fit")
def _flomnigui_check_attribute_not_exists(self, attribute_name):
@@ -70,31 +82,39 @@ class flomniGuiTools:
return False
return True
def _flomnigui_is_missing(self, attribute_name):
widget = getattr(self, attribute_name, None)
if widget is None:
return True
if hasattr(widget, "_is_deleted") and widget._is_deleted():
return True
return False
def flomnigui_show_cameras(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists(
"cam_flomni_gripper"
) or self._flomnigui_check_attribute_not_exists("cam_flomni_overview"):
self.flomnigui_remove_all_docks()
camera_gripper_image = self.gui.flomni.new("Image")
self.camera_gripper_image = self.gui.flomni.new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_gripper):
camera_gripper_image.image(device="cam_flomni_gripper", signal="preview")
camera_gripper_image.lock_aspect_ratio = True
camera_gripper_image.enable_fps_monitor = True
camera_gripper_image.enable_toolbar = False
camera_gripper_image.outer_axes = False
camera_gripper_image.inner_axes = False
self.camera_gripper_image.image(device="cam_flomni_gripper", signal="preview")
self.camera_gripper_image.lock_aspect_ratio = True
self.camera_gripper_image.enable_fps_monitor = True
self.camera_gripper_image.enable_toolbar = False
self.camera_gripper_image.outer_axes = False
self.camera_gripper_image.inner_axes = False
dev.cam_flomni_gripper.start_live_mode()
else:
print("Cannot open camera_gripper. Device does not exist.")
camera_overview_image = self.gui.flomni.new("Image")
self.camera_overview_image = self.gui.flomni.new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_overview):
camera_overview_image.image(device="cam_flomni_overview", signal="preview")
camera_overview_image.lock_aspect_ratio = True
camera_overview_image.enable_fps_monitor = True
camera_overview_image.enable_toolbar = False
camera_overview_image.outer_axes = False
camera_overview_image.inner_axes = False
self.camera_overview_image.image(device="cam_flomni_overview", signal="preview")
self.camera_overview_image.lock_aspect_ratio = True
self.camera_overview_image.enable_fps_monitor = True
self.camera_overview_image.enable_toolbar = False
self.camera_overview_image.outer_axes = False
self.camera_overview_image.inner_axes = False
dev.cam_flomni_overview.start_live_mode()
else:
print("Cannot open camera_overview. Device does not exist.")
@@ -104,15 +124,20 @@ class flomniGuiTools:
# dev.cam_flomni_gripper.stop_live_mode()
# dev.cam_xeye.live_mode = False
if hasattr(self.gui, "flomni"):
self.gui.flomni.delete_all()
self.gui.flomni.delete_all(timeout=self.GUI_RPC_TIMEOUT)
self.progressbar = None
self.text_box = None
self.xeyegui = None
self.pdf_viewer = None
self.idle_text_box = None
self.camera_gripper_image = None
self.camera_overview_image = None
def flomnigui_idle(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("idle_text_box"):
if self._flomnigui_is_missing("idle_text_box"):
self.flomnigui_remove_all_docks()
idle_text_box = self.gui.flomni.new("TextBox")
self.idle_text_box = self.gui.flomni.new("TextBox")
text = (
"<pre>"
+ "██████╗ ███████╗ ██████╗ ███████╗██╗ ██████╗ ███╗ ███╗███╗ ██╗██╗\n"
@@ -123,7 +148,7 @@ class flomniGuiTools:
+ "╚═════╝ ╚══════╝ ╚═════╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝\n"
+ "</pre>"
)
idle_text_box.set_html_text(text)
self.idle_text_box.set_html_text(text)
def flomnigui_docs(self, filename: str | None = None):
import csaxs_bec
@@ -167,7 +192,7 @@ class flomniGuiTools:
# --- GUI handling (active existence check) ----------------------------
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("PdfViewerWidget"):
if self._flomnigui_is_missing("pdf_viewer"):
self.flomnigui_remove_all_docks()
self.pdf_viewer = self.gui.flomni.new(widget="PdfViewerWidget")
@@ -185,7 +210,7 @@ class flomniGuiTools:
def flomnigui_show_progress(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("progressbar"):
if self._flomnigui_is_missing("progressbar"):
self.flomnigui_remove_all_docks()
# Add a new dock with a RingProgressBar widget
self.progressbar = self.gui.flomni.new("RingProgressBar")
@@ -198,6 +223,14 @@ class flomniGuiTools:
self._flomnigui_update_progress()
def _flomnigui_update_progress(self):
"""Update the progress ring bar and center label from the current progress state.
``self.progress`` is backed by the BEC global variable ``tomo_progress``
(see :class:`_ProgressProxy` in ``flomni.py``), so this method reflects
the live state that is also accessible from other BEC client sessions via::
client.get_global_var("tomo_progress")
"""
main_progress_ring = self.progressbar.rings[0]
subtomo_progress_ring = self.progressbar.rings[1]
if self.progressbar is not None:
@@ -210,6 +243,31 @@ class flomniGuiTools:
main_progress_ring.set_value(progress)
subtomo_progress_ring.set_value(subtomo_progress)
# --- format start time for display --------------------------------
start_str = self.progress.get("tomo_start_time")
if start_str is not None:
import datetime as _dt
start_display = _dt.datetime.fromisoformat(start_str).strftime("%Y-%m-%d %H:%M:%S")
else:
start_display = "N/A"
# --- format estimated remaining time ------------------------------
remaining_s = self.progress.get("estimated_remaining_time")
if remaining_s is not None and remaining_s >= 0:
import datetime as _dt
remaining_s = int(remaining_s)
h, rem = divmod(remaining_s, 3600)
m, s = divmod(rem, 60)
if h > 0:
eta_display = f"{h}h {m:02d}m {s:02d}s"
elif m > 0:
eta_display = f"{m}m {s:02d}s"
else:
eta_display = f"{s}s"
else:
eta_display = "N/A"
# ------------------------------------------------------------------
text = (
f"Progress report:\n"
f" Tomo type: {self.progress['tomo_type']}\n"
@@ -218,7 +276,9 @@ class flomniGuiTools:
f" Angle: {self.progress['angle']:.1f}\n"
f" Current subtomo: {self.progress['subtomo']}\n"
f" Current projection within subtomo: {self.progress['subtomo_projection']}\n"
f" Total projections per subtomo: {int(self.progress['subtomo_total_projections'])}"
f" Total projections per subtomo: {int(self.progress['subtomo_total_projections'])}\n"
f" Scan started: {start_display}\n"
f" Est. remaining: {eta_display}"
)
self.progressbar.set_center_label(text)

View File

@@ -253,6 +253,8 @@ class XrayEyeAlign:
umv(dev.rtx, 0)
print("You are ready to remove the xray eye and start ptychography scans.")
print("Fine alignment: flomni.tomo_parameters() , then flomni.tomo_alignment_scan()")
print("After that, run the fit in Matlab and load the new fit flomni.read_alignment_offset()")
def write_output(self):
file = os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues")

Binary file not shown.

After

Width:  |  Height:  |  Size: 359 KiB

View File

@@ -0,0 +1,96 @@
"""
omny/webpage_generator.py
==========================
OMNY-specific webpage generator subclass.
Integration (inside the OMNY __init__ / startup):
--------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.omny.webpage_generator import (
OmnyWebpageGenerator,
)
self._webpage_gen = OmnyWebpageGenerator(
bec_client=client,
output_dir="~/data/raw/webpage/",
)
self._webpage_gen.start()
Or use the factory (auto-selects by session name "omny"):
---------------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
make_webpage_generator,
)
self._webpage_gen = make_webpage_generator(bec, output_dir="~/data/raw/webpage/")
self._webpage_gen.start()
Interactive helpers:
--------------------
omny._webpage_gen.status()
omny._webpage_gen.verbosity = 2
omny._webpage_gen.stop()
omny._webpage_gen.start()
"""
from pathlib import Path
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
WebpageGeneratorBase,
_safe_get,
_safe_float,
_gvar,
)
class OmnyWebpageGenerator(WebpageGeneratorBase):
"""
OMNY-specific webpage generator.
Logo: OMNY.png from the same directory as this file.
Override _collect_setup_data() to add OMNY-specific temperatures,
sample name, and measurement settings.
The old OMNY spec webpage showed:
- Cryo temperatures (XOMNY-TEMP-CRYO-A/B)
- Per-channel temperatures (XOMNY-TEMP1..48)
- Dewar pressure / LN2 flow
- Interferometer strengths (OINTERF)
Map these to BEC device paths below once available.
"""
# TODO: fill in OMNY-specific device paths
# label -> dotpath under device_manager.devices
_TEMP_MAP = {
# "Sample (cryo A)": "omny_temp.cryo_a",
# "Cryo head (B)": "omny_temp.cryo_b",
}
def _logo_path(self):
return Path(__file__).parent / "OMNY.png"
def _collect_setup_data(self) -> dict:
# ── OMNY-specific data goes here ──────────────────────────────
# Uncomment and adapt when device names are known:
#
# dm = self._bec.device_manager
# sample_name = _safe_get(dm, "omny_samples.sample_names.sample0") or "N/A"
# temperatures = {
# label: _safe_float(_safe_get(dm, path))
# for label, path in self._TEMP_MAP.items()
# }
# settings = {
# "Sample name": sample_name,
# "FOV x / y": ...,
# "Exposure time": _gvar(self._bec, "tomo_countingtime", ".3f", " s"),
# "Angle step": _gvar(self._bec, "tomo_angle_stepsize", ".2f", "\u00b0"),
# }
# return {
# "type": "omny",
# "sample_name": sample_name,
# "temperatures": temperatures,
# "settings": settings,
# }
# Placeholder — returns minimal info until implemented
return {
"type": "omny",
# OMNY-specific data here
}

View File

@@ -38,12 +38,14 @@ class XRayEye(RPCBase):
None
"""
@rpc_timeout(20)
@rpc_call
def on_live_view_enabled(self, enabled: "bool"):
"""
None
"""
@rpc_timeout(20)
@rpc_call
def on_motors_enable(self, x_enable: "bool", y_enable: "bool"):
"""
@@ -54,6 +56,7 @@ class XRayEye(RPCBase):
y_enable(bool): enable y motor controls
"""
@rpc_timeout(20)
@rpc_call
def enable_submit_button(self, enable: "bool"):
"""
@@ -90,12 +93,14 @@ class XRayEye(RPCBase):
None
"""
@rpc_timeout(20)
@rpc_call
def switch_tab(self, tab: "str"):
"""
None
"""
@rpc_timeout(20)
@rpc_call
def submit_fit_array(self, fit_array):
"""

View File

@@ -4,6 +4,7 @@ from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
from bec_qthemes import material_icon
from bec_widgets import BECWidget, SafeProperty, SafeSlot
from bec_widgets.utils.rpc_decorator import rpc_timeout
from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree
@@ -367,6 +368,7 @@ class XRayEye(BECWidget, QWidget):
return self.message_line_edit.toPlainText()
@user_message.setter
@rpc_timeout(20)
def user_message(self, message: str):
self.message_line_edit.setText(message)
@@ -375,6 +377,7 @@ class XRayEye(BECWidget, QWidget):
return self.sample_name_line_edit.text()
@sample_name.setter
@rpc_timeout(20)
def sample_name(self, message: str):
self.sample_name_line_edit.setText(message)
@@ -395,6 +398,7 @@ class XRayEye(BECWidget, QWidget):
################################################################################
@SafeSlot(str)
@rpc_timeout(20)
def switch_tab(self, tab: str):
if tab == "fit":
self.tab_widget.setCurrentIndex(1)
@@ -412,6 +416,7 @@ class XRayEye(BECWidget, QWidget):
return roi.get_coordinates()
@SafeSlot(bool)
@rpc_timeout(20)
def on_live_view_enabled(self, enabled: bool):
logger.info(f"Live view is enabled: {enabled}")
self.live_preview_toggle.blockSignals(True)
@@ -460,6 +465,7 @@ class XRayEye(BECWidget, QWidget):
self.shutter_toggle.blockSignals(False)
@SafeSlot(bool, bool)
@rpc_timeout(20)
def on_motors_enable(self, x_enable: bool, y_enable: bool):
"""
Enable/Disable motor controls
@@ -472,6 +478,7 @@ class XRayEye(BECWidget, QWidget):
self.motor_control_2d.enable_controls_ver(y_enable)
@SafeSlot(bool)
@rpc_timeout(20)
def enable_submit_button(self, enable: bool):
"""
Enable/disable submit button.
@@ -509,6 +516,7 @@ class XRayEye(BECWidget, QWidget):
print(f"meta: {meta}")
@SafeSlot()
@rpc_timeout(20)
def submit_fit_array(self, fit_array):
self.tab_widget.setCurrentIndex(1)
# self.fix_x.title = " got fit array"

View File

@@ -395,6 +395,16 @@ rtz:
readoutPriority: on_request
connectionTimeout: 20
rt_flyer:
deviceClass: csaxs_bec.devices.omny.rt.rt_flomni_ophyd.RtFlomniFlyer
deviceConfig:
host: mpc2844.psi.ch
port: 2222
readoutPriority: async
connectionTimeout: 20
enabled: true
readOnly: False
############################################################
####################### Cameras ############################
############################################################

View File

@@ -317,8 +317,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
try:
scan_done = bool(value == self._num_total_triggers)
self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done)
if scan_done:
self._scan_done_event.set()
except Exception:
content = traceback.format_exc()
logger.info(f"Device {self.name} error: {content}")
@@ -393,6 +391,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self._current_data_index = 0
# NOTE Make sure that the signal that omits mca callbacks is cleared
# DO NOT REMOVE!!
self._omit_mca_callbacks.clear()
# For a fly scan we need to start the mcs card ourselves
@@ -563,8 +562,9 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
def on_stop(self) -> None:
"""Hook called when the device is stopped. In addition, any status that is registered through cancel_on_stop will be cancelled here."""
self.stop_all.put(1)
self.erase_all.put(1)
with suppress_mca_callbacks(self):
self.stop_all.put(1)
self.erase_all.put(1)
def mcs_recovery(self, timeout: int = 1) -> None:
"""

View File

@@ -1,20 +1,18 @@
import threading
import time
from typing import List
import numpy as np
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices import AsyncMultiSignal, DeviceStatus, ProgressSignal
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
from prettytable import PrettyTable
from csaxs_bec.devices.omny.rt.rt_ophyd import (
BECConfigError,
RtCommunicationError,
RtError,
RtReadbackSignal,
@@ -432,27 +430,6 @@ class RtFlomniController(Controller):
t.add_row([i, self.read_ssi_interferometer(i)])
print(t)
def _get_signals_from_table(self, return_table) -> dict:
self.average_stdeviations_x_st_fzp += float(return_table[4])
self.average_stdeviations_y_st_fzp += float(return_table[7])
signals = {
"target_x": {"value": float(return_table[2])},
"average_x_st_fzp": {"value": float(return_table[3])},
"stdev_x_st_fzp": {"value": float(return_table[4])},
"target_y": {"value": float(return_table[5])},
"average_y_st_fzp": {"value": float(return_table[6])},
"stdev_y_st_fzp": {"value": float(return_table[7])},
"average_rotz": {"value": float(return_table[8])},
"stdev_rotz": {"value": float(return_table[9])},
"average_stdeviations_x_st_fzp": {
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
},
"average_stdeviations_y_st_fzp": {
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
},
}
return signals
@threadlocked
def start_scan(self):
if not self.feedback_is_running():
@@ -492,91 +469,6 @@ class RtFlomniController(Controller):
current_position_in_scan = int(float(return_table[2]))
return (mode, number_of_positions_planned, current_position_in_scan)
def read_positions_from_sampler(self):
# this was for reading after the scan completed
number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read
read_counter = 0
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
# if not (mode==2 or mode==3):
# error
self.device_manager.connector.set(
MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage(
device="rt_scan", status=1, metadata=self.readout_metadata
),
)
# while scan is running
while mode > 0:
# TODO here?: scan abortion if no progress in scan *raise error
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
time.sleep(0.01)
if current_position_in_scan > 5:
while current_position_in_scan > read_counter + 1:
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
# logger.info(f"{return_table}")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
time.sleep(0.05)
# read the last samples even though scan is finished already
while number_of_positions_planned > read_counter:
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
# logger.info(f"{return_table}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
self.device_manager.connector.set(
MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage(
device="rt_scan", status=0, metadata=self.readout_metadata
),
)
logger.info(
"Flomni statistics: Average of all standard deviations: x"
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}"
)
def publish_device_data(self, signals, point_id):
self.device_manager.connector.set_and_publish(
MessageEndpoints.device_read("rt_flomni"),
messages.DeviceMessage(
signals=signals, metadata={"point_id": point_id, **self.readout_metadata}
),
)
def start_readout(self):
readout = threading.Thread(target=self.read_positions_from_sampler)
readout.start()
def kickoff(self, metadata):
self.readout_metadata = metadata
while not self._min_scan_buffer_reached:
time.sleep(0.001)
self.start_scan()
time.sleep(0.1)
self.start_readout()
class RtFlomniReadbackSignal(RtReadbackSignal):
@retry_once
@@ -844,6 +736,185 @@ class RtFlomniMotor(Device, PositionerBase):
return super().stop(success=success)
class RtFlomniFlyer(Device):
USER_ACCESS = ["controller"]
data = Cpt(
AsyncMultiSignal,
name="data",
signals=[
"target_x",
"average_x_st_fzp",
"stdev_x_st_fzp",
"target_y",
"average_y_st_fzp",
"stdev_y_st_fzp",
"average_rotz",
"stdev_rotz",
"average_stdeviations_x_st_fzp",
"average_stdeviations_y_st_fzp",
],
ndim=1,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
)
progress = Cpt(
ProgressSignal, doc="ProgressSignal indicating the progress of the device during a scan."
)
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
host="mpc2844.psi.ch",
port=2222,
socket_cls=SocketIO,
device_manager=None,
**kwargs,
):
super().__init__(prefix=prefix, name=name, parent=parent, **kwargs)
self.shutdown_event = threading.Event()
self.controller = RtFlomniController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
)
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
self.readout_thread = None
self.scan_done_event = threading.Event()
self.scan_done_event.set()
def read_positions_from_sampler(self, status: DeviceStatus):
"""
Read the positions from the sampler and update the data signal.
This function runs in a separate thread and continuously checks the
scan status.
Args:
status (DeviceStatus): The status object to update when the readout is complete.
"""
read_counter = 0
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
mode, number_of_positions_planned, current_position_in_scan = (
self.controller.get_scan_status()
)
# while scan is running
while mode > 0 and not self.shutdown_event.wait(0.01):
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
mode, number_of_positions_planned, current_position_in_scan = (
self.controller.get_scan_status()
)
if current_position_in_scan > 5:
while current_position_in_scan > read_counter + 1:
return_table = (
self.controller.socket_put_and_receive(f"r{read_counter}")
).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
self.progress.put(
value=read_counter, max_value=number_of_positions_planned, done=False
)
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.data.set(signals)
if self.shutdown_event.wait(0.05):
logger.info("Shutdown event set, stopping readout.")
# if we are here, the shutdown_event is set. We can exit the readout loop.
status.set_finished()
return
# read the last samples even though scan is finished already
while number_of_positions_planned > read_counter and not self.shutdown_event.is_set():
return_table = (self.controller.socket_put_and_receive(f"r{read_counter}")).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=False)
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.data.set(signals)
# NOTE: No need to set the status to failed if the shutdown_event is set.
# The stop() method will take care of that.
status.set_finished()
self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=True)
logger.info(
"Flomni statistics: Average of all standard deviations: x"
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}"
)
def _get_signals_from_table(self, return_table) -> dict:
self.average_stdeviations_x_st_fzp += float(return_table[4])
self.average_stdeviations_y_st_fzp += float(return_table[7])
signals = {
"target_x": {"value": float(return_table[2])},
"average_x_st_fzp": {"value": float(return_table[3])},
"stdev_x_st_fzp": {"value": float(return_table[4])},
"target_y": {"value": float(return_table[5])},
"average_y_st_fzp": {"value": float(return_table[6])},
"stdev_y_st_fzp": {"value": float(return_table[7])},
"average_rotz": {"value": float(return_table[8])},
"stdev_rotz": {"value": float(return_table[9])},
"average_stdeviations_x_st_fzp": {
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
},
"average_stdeviations_y_st_fzp": {
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
},
}
return signals
def stage(self):
self.shutdown_event.clear()
self.scan_done_event.set()
return super().stage()
def start_readout(self, status: DeviceStatus):
self.readout_thread = threading.Thread(
target=self.read_positions_from_sampler, args=(status,)
)
self.readout_thread.start()
def kickoff(self) -> DeviceStatus:
self.shutdown_event.clear()
self.scan_done_event.clear()
while not self.controller._min_scan_buffer_reached and not self.shutdown_event.wait(0.001):
...
self.controller.start_scan()
self.shutdown_event.wait(0.1)
status = DeviceStatus(self)
status.set_finished()
return status
def complete(self) -> DeviceStatus:
"""Wait until the flyer is done."""
if self.scan_done_event.is_set():
# if the scan_done_event is already set, we can return a finished status immediately
status = DeviceStatus(self)
status.set_finished()
return status
status = DeviceStatus(self)
self.start_readout(status)
status.add_callback(lambda *args, **kwargs: self.scan_done_event.set())
return status
def stop(self, *, success=False):
self.shutdown_event.set()
self.scan_done_event.set()
if self.readout_thread is not None:
self.readout_thread.join()
return super().stop(success=success)
if __name__ == "__main__":
rtcontroller = RtFlomniController(
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None

View File

@@ -27,20 +27,19 @@ from bec_lib import bec_logger, messages
from bec_lib.alarm_handler import Alarms
from bec_lib.endpoints import MessageEndpoints
from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.scans import SyncFlyScanBase
from bec_server.scan_server.scans import AsyncFlyScanBase
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import TRIGGERSOURCE
logger = bec_logger.logger
class FlomniFermatScan(SyncFlyScanBase):
class FlomniFermatScan(AsyncFlyScanBase):
scan_name = "flomni_fermat_scan"
scan_type = "fly"
required_kwargs = ["fovx", "fovy", "exp_time", "step", "angle"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
use_scan_progress_report = True
def __init__(
self,
@@ -104,6 +103,14 @@ class FlomniFermatScan(SyncFlyScanBase):
self.zshift = -100
self.flomni_rotation_status = None
def scan_report_instructions(self):
"""Scan report instructions for the progress bar"""
yield from self.stubs.scan_report_instruction({"device_progress": ["rt_flyer"]})
@property
def monitor_sync(self) -> str:
return "rt_flyer"
def initialize(self):
self.scan_motors = []
self.update_readout_priority()
@@ -113,10 +120,6 @@ class FlomniFermatScan(SyncFlyScanBase):
self.positions, corridor_size=self.optim_trajectory_corridor
)
@property
def monitor_sync(self):
return "rt_flomni"
def reverse_trajectory(self):
"""
Reverse the trajectory. Every other scan should be reversed to
@@ -290,26 +293,18 @@ class FlomniFermatScan(SyncFlyScanBase):
return np.array(positions)
def scan_core(self):
# use a device message to receive the scan number and
# scan ID before sending the message to the device server
yield from self.stubs.kickoff(device="rtx")
while True:
yield from self.stubs.read(group="monitored")
status = self.connector.get(MessageEndpoints.device_status("rt_scan"))
if status:
status_id = status.content.get("status", 1)
request_id = status.metadata.get("RID")
if status_id == 0 and self.metadata.get("RID") == request_id:
break
if status_id == 2 and self.metadata.get("RID") == request_id:
raise ScanAbortion(
"An error occured during the flomni readout:"
f" {status.metadata.get('error')}"
)
# send off the flyer
yield from self.stubs.kickoff(device="rt_flyer")
# start the readout loop of the flyer
status = yield from self.stubs.complete(device="rt_flyer", wait=False)
# read the monitors until the flyer is done
while not status.done:
yield from self.stubs.read(group="monitored", point_id=self.point_id)
self.point_id += 1
time.sleep(1)
logger.debug("reading monitors")
# yield from self.device_rpc("rtx", "controller.kickoff")
def move_to_start(self):
"""return to the start position"""
@@ -336,6 +331,7 @@ class FlomniFermatScan(SyncFlyScanBase):
yield from self.read_scan_motors()
self.prepare_positions()
yield from self._prepare_setup()
yield from self.scan_report_instructions()
yield from self.open_scan()
yield from self.stage()
yield from self.run_baseline_reading()

View File

@@ -217,6 +217,16 @@ def test_mcs_card_csaxs_complete_and_stop(mock_mcs_csaxs: MCSCardCSAXS):
assert not mcs._start_monitor_async_data_emission.is_set()
def test_mcs_on_stop(mock_mcs_csaxs: MCSCardCSAXS):
"""Test that on stop sets the omit_mca_callbacks flag. Also test that on stage clears the omit_mca_callbacks flag."""
mcs = mock_mcs_csaxs
assert mcs._omit_mca_callbacks.is_set() is False
mcs.stop()
assert mcs._omit_mca_callbacks.is_set() is True
mcs.stage()
assert mcs._omit_mca_callbacks.is_set() is False
def test_mcs_recovery(mock_mcs_csaxs: MCSCardCSAXS):
mcs = mock_mcs_csaxs
# Simulate ongoing acquisition