refactor
CI for csaxs_bec / test (push) Failing after 39s
CI for csaxs_bec / test (pull_request) Failing after 37s

This commit is contained in:
x01dc
2026-04-22 16:33:39 +02:00
committed by holler
parent c1e2722db2
commit 49def798fc
5 changed files with 59 additions and 354 deletions
@@ -1,312 +0,0 @@
import builtins
import time
import numpy as np
from bec_lib import bec_logger
from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
class XrayEyeAlign:
# pixel calibration, multiply to get mm
# PIXEL_CALIBRATION = 0.2/209 #.2 with binning
PIXEL_CALIBRATION = 0.2 / 218 # .2 with binning
def __init__(self, client, lamni) -> None:
self.client = client
self.lamni = lamni
self.device_manager = client.device_manager
self.scans = client.scans
self.corr_pos_x = []
self.corr_pos_y = []
self.corr_angle = []
self.corr_pos_x_2 = []
self.corr_pos_y_2 = []
self.corr_angle_2 = []
# ------------------------------------------------------------------
# Correction reset
# ------------------------------------------------------------------
def reset_correction(self):
self.corr_pos_x = []
self.corr_pos_y = []
self.corr_angle = []
def reset_correction_2(self):
self.corr_pos_x_2 = []
self.corr_pos_y_2 = []
self.corr_angle_2 = []
def reset_xray_eye_correction(self):
self.client.delete_global_var("tomo_fit_xray_eye")
# ------------------------------------------------------------------
# FOV offset properties
# ------------------------------------------------------------------
@property
def tomo_fovx_offset(self):
val = self.client.get_global_var("tomo_fov_offset")
if val is None:
return 0.0
return val[0] / 1000
@tomo_fovx_offset.setter
@typechecked
def tomo_fovx_offset(self, val: float):
val_old = self.client.get_global_var("tomo_fov_offset")
if val_old is None:
val_old = [0.0, 0.0]
self.client.set_global_var("tomo_fov_offset", [val * 1000, val_old[1]])
@property
def tomo_fovy_offset(self):
val = self.client.get_global_var("tomo_fov_offset")
if val is None:
return 0.0
return val[1] / 1000
@tomo_fovy_offset.setter
@typechecked
def tomo_fovy_offset(self, val: float):
val_old = self.client.get_global_var("tomo_fov_offset")
if val_old is None:
val_old = [0.0, 0.0]
self.client.set_global_var("tomo_fov_offset", [val_old[0], val * 1000])
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# X-ray eye camera control
# ------------------------------------------------------------------
def update_fov(self, k: int):
self._xray_fov_xy[0] = max(epics_get(f"XOMNYI-XEYE-XWIDTH_X:{k}"), self._xray_fov_xy[0])
self._xray_fov_xy[1] = max(0, self._xray_fov_xy[0])
# ------------------------------------------------------------------
# Alignment procedure
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# Alignment output
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# X-ray eye sinusoidal correction (loaded from MATLAB fit files)
# ------------------------------------------------------------------
def read_xray_eye_correction(self, dir_path=None):
import os
if dir_path is None:
dir_path = os.path.expanduser("~/Data10/specES1/internal/")
tomo_fit_xray_eye = np.zeros((2, 3))
for i, axis in enumerate(["x", "y"]):
for j, coeff in enumerate(["A", "B", "C"]):
with open(os.path.join(dir_path, f"ptychotomoalign_{coeff}{axis}.txt"), "r") as f:
tomo_fit_xray_eye[i][j] = f.readline()
self.client.set_global_var("tomo_fit_xray_eye", tomo_fit_xray_eye.tolist())
# x amp, phase, offset, y amp, phase, offset
# 0 0 0 1 0 2 1 0 1 1 1 2
print("New alignment parameters loaded from X-ray eye")
def read_xray_eye_correction_from_gui(self):
"""Read the sinusoidal fit from the XRayEye GUI widget.
Replaces the MATLAB file read (read_xray_eye_correction) when the
BEC GUI alignment has been performed. Reads fit_params_x and
fit_params_y from the omny_xray_gui device and stores the result as
tomo_fit_xray_eye = [[Ax, Bx, Cx], [Ay, By, Cy]] in the BEC global
variable store.
The stored array is consumed by lamni_compute_additional_correction_xeye_mu().
Important: this method reads from the live GUI widget via the device
(omny_xray_gui). If the XRayEye GUI window has been closed since the
alignment was performed, the fit parameters will no longer be available
and this call will fail. In that case use read_xray_eye_correction()
to reload from the archived text files written by write_output().
"""
import builtins
dev = builtins.__dict__.get("dev")
tomo_fit_xray_eye = np.zeros((2, 3))
params_x = dev.omny_xray_gui.fit_params_x.get()
tomo_fit_xray_eye[0][0] = params_x["SineModel_0_amplitude"]
tomo_fit_xray_eye[0][1] = params_x["SineModel_0_shift"]
tomo_fit_xray_eye[0][2] = params_x["LinearModel_1_intercept"]
params_y = dev.omny_xray_gui.fit_params_y.get()
tomo_fit_xray_eye[1][0] = params_y["SineModel_0_amplitude"]
tomo_fit_xray_eye[1][1] = params_y["SineModel_0_shift"]
tomo_fit_xray_eye[1][2] = params_y["LinearModel_1_intercept"]
self.client.set_global_var("tomo_fit_xray_eye", tomo_fit_xray_eye.tolist())
print("New alignment parameters loaded from XRayEye GUI fit:")
print(
f" X: A={tomo_fit_xray_eye[0][0]:.4f}, "
f"B={tomo_fit_xray_eye[0][1]:.4f}, "
f"C={tomo_fit_xray_eye[0][2]:.4f}"
)
print(
f" Y: A={tomo_fit_xray_eye[1][0]:.4f}, "
f"B={tomo_fit_xray_eye[1][1]:.4f}, "
f"C={tomo_fit_xray_eye[1][2]:.4f}"
)
def read_xray_eye_correction_from_gui(self):
"""Read the sinusoidal fit from the XRayEye GUI widget.
Replaces the MATLAB file read (read_xray_eye_correction) when the
BEC GUI alignment has been performed. Reads fit_params_x and
fit_params_y from the omny_xray_gui device and stores the result as
tomo_fit_xray_eye = [[Ax, Bx, Cx], [Ay, By, Cy]] in the BEC global
variable store.
The stored array is consumed by lamni_compute_additional_correction_xeye_mu().
"""
import builtins
dev = builtins.__dict__.get("dev")
tomo_fit_xray_eye = np.zeros((2, 3))
params_x = dev.omny_xray_gui.fit_params_x.get()
tomo_fit_xray_eye[0][0] = params_x["SineModel_0_amplitude"]
tomo_fit_xray_eye[0][1] = params_x["SineModel_0_shift"]
tomo_fit_xray_eye[0][2] = params_x["LinearModel_1_intercept"]
params_y = dev.omny_xray_gui.fit_params_y.get()
tomo_fit_xray_eye[1][0] = params_y["SineModel_0_amplitude"]
tomo_fit_xray_eye[1][1] = params_y["SineModel_0_shift"]
tomo_fit_xray_eye[1][2] = params_y["LinearModel_1_intercept"]
self.client.set_global_var("tomo_fit_xray_eye", tomo_fit_xray_eye.tolist())
print("New alignment parameters loaded from XRayEye GUI fit:")
print(
f" X: A={tomo_fit_xray_eye[0][0]:.4f}, "
f"B={tomo_fit_xray_eye[0][1]:.4f}, "
f"C={tomo_fit_xray_eye[0][2]:.4f}"
)
print(
f" Y: A={tomo_fit_xray_eye[1][0]:.4f}, "
f"B={tomo_fit_xray_eye[1][1]:.4f}, "
f"C={tomo_fit_xray_eye[1][2]:.4f}"
)
print(
f"X Amplitude {tomo_fit_xray_eye[0][0]}, "
f"X Phase {tomo_fit_xray_eye[0][1]}, "
f"X Offset {tomo_fit_xray_eye[0][2]}, "
f"Y Amplitude {tomo_fit_xray_eye[1][0]}, "
f"Y Phase {tomo_fit_xray_eye[1][1]}, "
f"Y Offset {tomo_fit_xray_eye[1][2]}"
)
def lamni_compute_additional_correction_xeye_mu(self, angle):
"""Compute sinusoidal correction from the X-ray eye fit for the given angle."""
tomo_fit_xray_eye = self.client.get_global_var("tomo_fit_xray_eye")
if tomo_fit_xray_eye is None:
print("Not applying any additional correction. No x-ray eye data available.\n")
return (0, 0)
# x amp, phase, offset, y amp, phase, offset
# 0 0 0 1 0 2 1 0 1 1 1 2
correction_x = (
tomo_fit_xray_eye[0][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[0][1])
+ tomo_fit_xray_eye[0][2]
) / 1000
correction_y = (
tomo_fit_xray_eye[1][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[1][1])
+ tomo_fit_xray_eye[1][2]
) / 1000
print(f"Xeye correction x {correction_x}, y {correction_y} for angle {angle}\n")
return (correction_x, correction_y)
# ------------------------------------------------------------------
# Additional lookup-table corrections (iteration 1 and 2)
# ------------------------------------------------------------------
def read_additional_correction(self, correction_file: str):
self.corr_pos_x, self.corr_pos_y, self.corr_angle = self._read_correction_file_xy(
correction_file
)
def read_additional_correction_2(self, correction_file: str):
self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2 = self._read_correction_file_xy(
correction_file
)
def _read_correction_file_xy(self, correction_file: str):
"""Parse a correction file that contains corr_pos_x, corr_pos_y and corr_angle entries."""
with open(correction_file, "r") as f:
num_elements = f.readline()
int_num_elements = int(num_elements.split(" ")[2])
print(int_num_elements)
corr_pos_x = []
corr_pos_y = []
corr_angle = []
for j in range(0, int_num_elements * 3):
line = f.readline()
value = line.split(" ")[2]
name = line.split(" ")[0].split("[")[0]
if name == "corr_pos_x":
corr_pos_x.append(float(value) / 1000)
elif name == "corr_pos_y":
corr_pos_y.append(float(value) / 1000)
elif name == "corr_angle":
corr_angle.append(float(value))
return corr_pos_x, corr_pos_y, corr_angle
def compute_additional_correction(self, angle):
return self._compute_correction_xy(
angle, self.corr_pos_x, self.corr_pos_y, self.corr_angle, label="1"
)
def compute_additional_correction_2(self, angle):
return self._compute_correction_xy(
angle, self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2, label="2"
)
def _compute_correction_xy(self, angle, corr_pos_x, corr_pos_y, corr_angle, label=""):
"""Find the correction for the closest angle in the lookup table."""
if not corr_pos_x:
print(f"Not applying additional correction {label}. No data available.\n")
return (0, 0)
shift_x = corr_pos_x[0]
shift_y = corr_pos_y[0]
angledelta = np.fabs(corr_angle[0] - angle)
for j in range(1, len(corr_pos_x)):
newangledelta = np.fabs(corr_angle[j] - angle)
if newangledelta < angledelta:
shift_x = corr_pos_x[j]
shift_y = corr_pos_y[j]
angledelta = newangledelta
if shift_x == 0 and angle < corr_angle[0]:
shift_x = corr_pos_x[0]
shift_y = corr_pos_y[0]
if shift_x == 0 and angle > corr_angle[-1]:
shift_x = corr_pos_x[-1]
shift_y = corr_pos_y[-1]
print(f"Additional correction shifts {label}: {shift_x} {shift_y}")
return (shift_x, shift_y)
@@ -112,6 +112,20 @@ class LamNI(LamNIAlignmentMixin, LamNIOpticsMixin, LamniGuiTools):
print("Alignment interrupted by user.")
raise exc
# ── Reset manual shifts if needed ─────────────────────────────
mx = self.manual_shift_x
my = self.manual_shift_y
if mx != 0.0 or my != 0.0:
self.manual_shift_x = 0.0
self.manual_shift_y = 0.0
# Use OMNY-style green status message
self.OMNYTools.printgreen(
f"Manual shifts were reset to zero after X-ray eye alignment "
f"(previous values: x={mx:.3f}, y={my:.3f})."
)
def xrayeye_update_frame(self, keep_shutter_open: bool = False):
"""Capture a single fresh X-ray eye frame without running full alignment.
@@ -774,11 +788,12 @@ class LamNI(LamNIAlignmentMixin, LamNIOpticsMixin, LamniGuiTools):
print(f"Stitching overlap = {self.tomo_stitch_overlap}")
print(f"Circular FOV diam <microns> = {self.tomo_circfov}")
print(f"Reconstruction queue name = {self.ptycho_reconstruct_foldername}")
print("FOV offset rotates to find the ROI; manual shift moves the rotation center.")
print(f" _tomo_fovx_offset <mm> = {self.tomo_fovx_offset}")
print(f" _tomo_fovy_offset <mm> = {self.tomo_fovy_offset}")
print(f" _manual_shift_x <mm> = {self.manual_shift_x}")
print(f" _manual_shift_y <mm> = {self.manual_shift_y}")
print("FOV offset rotates to find the ROI; initial values determined in Xrayeye alignment.")
print("manual shift moves the rotation center.")
print(f" _tomo_fovx_offset <mm> = {self.tomo_fovx_offset:.4f}")
print(f" _tomo_fovy_offset <mm> = {self.tomo_fovy_offset:.4f}")
print(f" _manual_shift_x <mm> = {self.manual_shift_x:.4f}")
print(f" _manual_shift_y <mm> = {self.manual_shift_y:.4f}")
print("")
if self.tomo_type == 1:
print("\x1b[1mTomo type 1:\x1b[0m 8 equally spaced sub-tomograms (360 deg)")
@@ -805,8 +820,8 @@ class LamNI(LamNIAlignmentMixin, LamNIOpticsMixin, LamniGuiTools):
print("Repeating projections at 0 deg at start of every second subtomogram.")
print(f"\nSample name: {self.sample_name}\n")
user_input = input("Are these parameters correctly set for your scan? ")
if user_input == "y":
if self.OMNYTools.yesno("Are these parameters correctly set for your scan?", "y"):
print("OK. continue.")
return
@@ -144,6 +144,8 @@ class LaMNIInitStages:
umv(dev.lsamrot, -1)
umv(dev.lsamrot, 0)
self.set_default_lamni_limits()
time.sleep(2)
dev.rtx.controller.feedback_disable_and_even_reset_lamni_angle_interferometer()
@@ -159,6 +161,38 @@ class LaMNIInitStages:
else:
return False
def set_default_lamni_limits(self):
"""
Apply safe, collision-protected default limits for LamNI.
Mirrors legacy SPEC limits.
"""
if not self.OMNYTools.yesno("Set default limits for LamNI?"):
print("Stopping.")
return
print("Setting LamNI limits...")
# Sample stages
dev.lsamx.limits = [6, 14]
dev.lsamy.limits = [6, 14]
dev.lsamrot.limits = [-3, 362]
# Optics (FZP)
dev.loptx.limits = [-1, -0.2]
dev.lopty.limits = [3.0, 3.6]
dev.loptz.limits = [82, 87]
# X-ray eye
dev.leyex.limits = [0, 25]
dev.leyey.limits = [0.5, 50]
# OSA / SmarAct
dev.losax.limits = [-1.5, 0.25]
dev.losay.limits = [-2.5, 4.1]
dev.losaz.limits = [-4.1, -0.5]
print("LamNI limits successfully applied.")
class LamNIOpticsMixin:
"""Optics movement methods: FZP, OSA, central stop and X-ray eye."""
@@ -178,12 +212,12 @@ class LamNIOpticsMixin:
epics_put("XOMNYI-XEYE-ACQ:0", 2)
umv(dev.lsamrot, 0)
umv(dev.dttrz, 5854, dev.fttrz, 2395)
#umv(dev.dttrz, 5854, dev.fttrz, 2395)
def leye_in(self):
bec.queue.next_dataset_number += 1
umv(dev.lsamrot, 0)
umv(dev.dttrz, 6419.677, dev.fttrz, 2959.979)
#umv(dev.dttrz, 6419.677, dev.fttrz, 2959.979)
while True:
moved_out = (input("Did the flight tube move out? (Y/n)") or "y").lower()
if moved_out == "y":
@@ -41,10 +41,6 @@ def umv(*args):
return scans.umv(*args, relative=False)
class FlomniToolsError(Exception):
pass
class FlomniInitError(Exception):
pass
@@ -53,34 +49,6 @@ class FlomniError(Exception):
pass
# class FlomniTools:
# def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
# if autoconfirm and default == "y":
# self.printgreen(message + " Automatically confirming default: yes")
# return True
# elif autoconfirm and default == "n":
# self.printgreen(message + " Automatically confirming default: no")
# return False
# if default == "y":
# message_ending = " [Y]/n? "
# elif default == "n":
# message_ending = " y/[N]? "
# else:
# message_ending = " y/n? "
# while True:
# user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
# if (
# user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
# ) or (default == "y" and user_input == ""):
# return True
# if (
# user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
# ) or (default == "n" and user_input == ""):
# return False
# else:
# print("Please expicitely confirm y or n.")
class FlomniInitStagesMixin:
def flomni_init_stages(self):
+1 -1
View File
@@ -73,7 +73,7 @@ The sample fine alignment can be obtained using ptychography. For this a short l
#### Shifting the FOV
* `lamni.tomo_fovx/y_offset=value` [mm] will shift the field of view. Perform this adjustment from projections collected at **lsamrot 0 degrees**.
* `lamni.tomo_fovx/y_offset=value` [mm] will shift the field of view. Perform this adjustment from projections collected at **lsamrot 0 degrees**. This shift will rotate. In contrast the manual shift will be a constant shift, identical at all angles.
### Laminography scan