fix(flomni): indentation error in if statement
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m57s
CI for csaxs_bec / test (push) Successful in 1m55s

This commit was merged in pull request #161.
This commit is contained in:
2026-03-18 14:02:44 +01:00
parent 3ca29dd0dd
commit 663d22fff4

View File

@@ -28,6 +28,7 @@ if builtins.__dict__.get("bec") is not None:
dev = builtins.__dict__.get("dev")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
@@ -35,12 +36,15 @@ def umv(*args):
class FlomniToolsError(Exception):
pass
class FlomniInitError(Exception):
pass
class FlomniError(Exception):
pass
# class FlomniTools:
# def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
# if autoconfirm and default == "y":
@@ -84,13 +88,16 @@ class FlomniInitStagesMixin:
else:
return
sensor_voltage_target = dev.ftransy.user_parameter.get("sensor_voltage")
sensor_voltage = float(dev.ftransy.controller.socket_put_and_receive("MG@AN[1]").strip())
if not np.isclose(sensor_voltage, sensor_voltage_target, 0.25):
print(f"Sensor voltage of the gripper is {sensor_voltage}, while target from config is {sensor_voltage_target}")
print("Verify that the value is acceptable and update config file. Reload config and start again.")
print(
f"Sensor voltage of the gripper is {sensor_voltage}, while target from config is {sensor_voltage_target}"
)
print(
"Verify that the value is acceptable and update config file. Reload config and start again."
)
return
print("Starting to drive ftransy to +y limit")
@@ -118,8 +125,9 @@ class FlomniInitStagesMixin:
dev.feyex.limits = [-30, -1]
print("done")
if self.OMNYTools.yesno("Init of foptz. Can the stage move to the upstream limit without collision?"):
if self.OMNYTools.yesno(
"Init of foptz. Can the stage move to the upstream limit without collision?"
):
print("OK. continue.")
else:
return
@@ -173,7 +181,9 @@ class FlomniInitStagesMixin:
dev.fsamy.limits = [2, 3.1]
print("done")
if self.OMNYTools.yesno("Init of tracking stages. Did you remove the outer laser flight tubes?"):
if self.OMNYTools.yesno(
"Init of tracking stages. Did you remove the outer laser flight tubes?"
):
print("OK. continue.")
else:
print("Stopping.")
@@ -206,7 +216,9 @@ class FlomniInitStagesMixin:
print("done")
print("Initializing UPR stage.")
if self.OMNYTools.yesno("To ensure that the end switches work, please check that they are currently not pushed. Is everything okay?"):
if self.OMNYTools.yesno(
"To ensure that the end switches work, please check that they are currently not pushed. Is everything okay?"
):
print("OK. continue.")
else:
print("Stopping.")
@@ -246,7 +258,9 @@ class FlomniInitStagesMixin:
dev.fsamroy.limits = [-5, 365]
print("done")
if self.OMNYTools.yesno("Init of foptx. Can the stage move to the positive limit without collision? Attention: tracker flight tube!"):
if self.OMNYTools.yesno(
"Init of foptx. Can the stage move to the positive limit without collision? Attention: tracker flight tube!"
):
print("OK. continue.")
else:
print("Stopping.")
@@ -425,7 +439,7 @@ class FlomniSampleTransferMixin:
def ftransfer_flomni_stage_in(self):
time.sleep(1)
sample_in_position = dev.flomni_samples.is_sample_slot_used(0)
#bool(float(dev.flomni_samples.sample_placed.sample0.get()))
# bool(float(dev.flomni_samples.sample_placed.sample0.get()))
if not sample_in_position:
raise FlomniError("There is no sample in the sample stage. Aborting.")
self.reset_correction()
@@ -440,7 +454,10 @@ class FlomniSampleTransferMixin:
print("Moving X-ray eye in.")
if self.OMNYTools.yesno("Please confirm that this is ok with the flight tube. This check is to be removed after commissioning", "n"):
if self.OMNYTools.yesno(
"Please confirm that this is ok with the flight tube. This check is to be removed after commissioning",
"n",
):
print("OK. continue.")
else:
print("Stopping.")
@@ -451,7 +468,6 @@ class FlomniSampleTransferMixin:
self.foptics_out()
self.xrayeye_update_frame()
def laser_tracker_show_all(self):
dev.rtx.controller.laser_tracker_show_all()
@@ -554,7 +570,6 @@ class FlomniSampleTransferMixin:
self.flomnigui_show_cameras()
self.ftransfer_gripper_move(position)
self.ftransfer_controller_enable_mount_mode()
@@ -594,7 +609,7 @@ class FlomniSampleTransferMixin:
self.check_sensor_connected()
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
#bool(float(dev.flomni_samples.sample_in_gripper.get()))
# bool(float(dev.flomni_samples.sample_in_gripper.get()))
if not sample_in_gripper:
raise FlomniError("The gripper does not carry a sample.")
@@ -646,7 +661,7 @@ class FlomniSampleTransferMixin:
def ftransfer_sample_change(self, new_sample_position: int):
self.check_tray_in()
# sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
# sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
if sample_in_gripper:
raise FlomniError("There is already a sample in the gripper. Aborting.")
@@ -654,28 +669,30 @@ class FlomniSampleTransferMixin:
self.check_position_is_valid(new_sample_position)
if new_sample_position == 0:
raise FlomniError("The new sample to place cannot be the sample in the sample stage. Aborting.")
raise FlomniError(
"The new sample to place cannot be the sample in the sample stage. Aborting."
)
# sample_placed = getattr(
# dev.flomni_samples.sample_placed, f"sample{new_sample_position}"
# ).get()
# sample_placed = getattr(
# dev.flomni_samples.sample_placed, f"sample{new_sample_position}"
# ).get()
sample_placed = dev.flomni_samples.is_sample_slot_used(new_sample_position)
if not sample_placed:
raise FlomniError(
f"There is currently no sample in position [{new_sample_position}]. Aborting."
)
# sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get()
# sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get()
sample_in_sample_stage = dev.flomni_samples.is_sample_slot_used(0)
if sample_in_sample_stage:
# find a new home for the sample...
empty_slots = []
# for name, val in dev.flomni_samples.read().items():
# if "flomni_samples_sample_placed_sample" not in name:
# continue
# if val.get("value") == 0:
# empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1]))
for j in range(1,20):
# for name, val in dev.flomni_samples.read().items():
# if "flomni_samples_sample_placed_sample" not in name:
# continue
# if val.get("value") == 0:
# empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1]))
for j in range(1, 20):
if not dev.flomni_samples.is_sample_slot_used(j):
empty_slots.append(j)
if not empty_slots:
@@ -769,7 +786,7 @@ class FlomniSampleTransferMixin:
def ftransfer_gripper_open(self):
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
#dev.flomni_samples.sample_in_gripper.get()
# dev.flomni_samples.sample_in_gripper.get()
if sample_in_gripper:
raise FlomniError(
"Cannot open gripper. There is still a sample in the gripper! Aborting."
@@ -790,7 +807,10 @@ class FlomniSampleTransferMixin:
fsamx_pos = dev.fsamx.readback.get()
if position == 0 and fsamx_pos > -160:
if self.OMNYTools.yesno("May the flomni stage be moved out for the sample change? Feedback will be disabled and alignment will be lost!", "y"):
if self.OMNYTools.yesno(
"May the flomni stage be moved out for the sample change? Feedback will be disabled and alignment will be lost!",
"y",
):
print("OK. continue.")
self.ftransfer_flomni_stage_out()
else:
@@ -946,7 +966,7 @@ class FlomniSampleTransferMixin:
class FlomniAlignmentMixin:
import csaxs_bec
import os
import os
from pathlib import Path
# Ensure this is a Path object, not a string
@@ -956,10 +976,13 @@ class FlomniAlignmentMixin:
# Build the absolute path correctly
default_correction_file = (
csaxs_bec_basepath.parent / 'bec_ipython_client' / 'plugins' / 'flomni' / default_correction_file_rel
csaxs_bec_basepath.parent
/ "bec_ipython_client"
/ "plugins"
/ "flomni"
/ default_correction_file_rel
).resolve()
def reset_correction(self, use_default_correction=True):
"""
Reset the correction to the default values.
@@ -1031,12 +1054,12 @@ class FlomniAlignmentMixin:
else:
params = dev.omny_xray_gui.fit_params_x.get()
#amplitude
tomo_alignment_fit[0][0] = params['SineModel_0_amplitude']
#phase
tomo_alignment_fit[0][1] = params['SineModel_0_shift']
#offset
tomo_alignment_fit[0][2] = params['LinearModel_1_intercept']
# amplitude
tomo_alignment_fit[0][0] = params["SineModel_0_amplitude"]
# phase
tomo_alignment_fit[0][1] = params["SineModel_0_shift"]
# offset
tomo_alignment_fit[0][2] = params["LinearModel_1_intercept"]
print("applying vertical default values from mirror calibration, not from fit!")
tomo_alignment_fit[1][0] = 0
tomo_alignment_fit[1][1] = 0
@@ -1045,7 +1068,6 @@ class FlomniAlignmentMixin:
tomo_alignment_fit[1][4] = 0
print("New alignment parameters loaded based on Xray eye alignment GUI:")
print(
f"X Amplitude {tomo_alignment_fit[0][0]}, "
f"X Phase {tomo_alignment_fit[0][1]}, "
@@ -1192,7 +1214,7 @@ class Flomni(
FlomniAlignmentMixin,
FlomniOpticsMixin,
cSAXSBeamlineChecks,
flomniGuiTools
flomniGuiTools,
):
def __init__(self, client):
super().__init__()
@@ -1224,7 +1246,10 @@ class Flomni(
def start_x_ray_eye_alignment(self, keep_shutter_open=False):
if self.OMNYTools.yesno("Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.", "y"):
if self.OMNYTools.yesno(
"Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.",
"y",
):
self.align = XrayEyeAlign(self.client, self)
try:
self.align.align(keep_shutter_open)
@@ -1236,7 +1261,7 @@ class Flomni(
umv(dev.fsamx, fsamx_in)
raise exc
def xrayeye_update_frame(self,keep_shutter_open=False):
def xrayeye_update_frame(self, keep_shutter_open=False):
self.align.update_frame(keep_shutter_open)
def xrayeye_alignment_start(self, keep_shutter_open=False):
@@ -1476,13 +1501,12 @@ class Flomni(
dev = builtins.__dict__.get("dev")
bec = builtins.__dict__.get("bec")
self.feye_out()
tags = ["BEC_alignment_tomo", self.sample_name]
self.write_to_scilog(
f"Starting alignment scan. First scan number: {bec.queue.next_scan_number}.", tags
)
start_angle = 0
angle_end = start_angle + 180
@@ -1504,15 +1528,17 @@ class Flomni(
error_caught = True
else:
raise exc
#todo here was if blchk success, then setting to success true
# todo here was if blchk success, then setting to success true
successful = True
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, 0)
umv(dev.fsamroy, 0)
self.OMNYTools.printgreenbold("\n\nAlignment scan finished. Please run SPEC_ptycho_align and load the new fit.")
self.OMNYTools.printgreenbold(
"\n\nAlignment scan finished. Please run SPEC_ptycho_align and load the new fit."
)
def _write_subtomo_to_scilog(self, subtomo_number):
dev = builtins.__dict__.get("dev")
@@ -1567,20 +1593,19 @@ class Flomni(
elif subtomo_number == 8:
start_angle = self.tomo_angle_stepsize / 8.0 * 7
# _tomo_shift_angles (potential global variable)
_tomo_shift_angles = 0
# compute number of projections
start = start_angle + _tomo_shift_angles
if subtomo_number % 2: # odd = forward
if subtomo_number % 2: # odd = forward
max_allowed_angle = 180.05 + self.tomo_angle_stepsize
proposed_end = start + 180
angle_end = min(proposed_end, max_allowed_angle)
span = angle_end - start
else: # even = reverse
else: # even = reverse
min_allowed_angle = 0
proposed_end = start - 180
angle_end = max(proposed_end, min_allowed_angle)
@@ -1589,38 +1614,23 @@ class Flomni(
# number of projections needed to maintain step size
N = int(span / self.tomo_angle_stepsize) + 1
angles = np.linspace(
start,
angle_end,
num=N,
endpoint=True,
)
angles = np.linspace(start, angle_end, num=N, endpoint=True)
if subtomo_number % 2: # odd subtomos → forward direction
if subtomo_number % 2: # odd subtomos → forward direction
# clamp end angle to max allowed
max_allowed_angle = 180.05 + self.tomo_angle_stepsize
proposed_end = start + 180
angle_end = min(proposed_end, max_allowed_angle)
angles = np.linspace(
start,
angle_end,
num=N,
endpoint=True,
)
angles = np.linspace(start, angle_end, num=N, endpoint=True)
else: # even subtomos → reverse direction
else: # even subtomos → reverse direction
# go FROM start_angle down toward 0
min_allowed_angle = 0
proposed_end = start - 180
angle_end = max(proposed_end, min_allowed_angle)
angles = np.linspace(
start,
angle_end,
num=N,
endpoint=True,
)
angles = np.linspace(start, angle_end, num=N, endpoint=True)
for i, angle in enumerate(angles):
@@ -1636,9 +1646,9 @@ class Flomni(
self._subtomo_offset = 0
else:
if subtomo_number % 2: # odd = forward direction
if subtomo_number % 2: # odd = forward direction
self._subtomo_offset = round(sa / step)
else: # even = reverse direction
else: # even = reverse direction
self._subtomo_offset = round((180 - sa) / step)
# progress index must always increase
@@ -1647,14 +1657,15 @@ class Flomni(
# existing progress fields
self.progress["subtomo_total_projections"] = int(180 / self.tomo_angle_stepsize)
self.progress["projection"] = (subtomo_number - 1) * self.progress["subtomo_total_projections"] + self.progress["subtomo_projection"]
self.progress["projection"] = (subtomo_number - 1) * self.progress[
"subtomo_total_projections"
] + self.progress["subtomo_projection"]
self.progress["total_projections"] = 180 / self.tomo_angle_stepsize * 8
self.progress["angle"] = angle
# finally do the scan at this angle
self._tomo_scan_at_angle(angle, subtomo_number)
def _tomo_scan_at_angle(self, angle, subtomo_number):
successful = False
error_caught = False
@@ -1662,7 +1673,7 @@ class Flomni(
print(f"Starting flOMNI scan for angle {angle} in subtomo {subtomo_number}")
self._print_progress()
while not successful:
#self.bl_chk._bl_chk_start()
# self.bl_chk._bl_chk_start()
if not self.special_angles:
self._current_special_angles = []
if self._current_special_angles:
@@ -1697,9 +1708,11 @@ class Flomni(
"""start a tomo scan"""
if not self._check_eye_out_and_optics_in():
print("Attention: The setup is not in measurement condition.\nXray eye might be IN or the Xray optics OUT.")
print(
"Attention: The setup is not in measurement condition.\nXray eye might be IN or the Xray optics OUT."
)
if self.OMNYTools.yesno("Shall I continue?", "n"):
print("OK")
print("OK")
else:
print("Stopping.")
return
@@ -1738,7 +1751,7 @@ class Flomni(
for ii in range(subtomo_start, 9):
self.sub_tomo_scan(ii, start_angle=start_angle)
start_angle = None
elif self.tomo_type == 2:
# Golden ratio tomography
previous_subtomo_number = -1
@@ -1834,9 +1847,9 @@ class Flomni(
else:
raise FlomniError("undefined tomo type")
self.progress['projection'] = self.progress['total_projections']
self.progress["projection"] = self.progress["total_projections"]
self.progress["subtomo_projection"] = self.progress["subtomo_total_projections"]
self._print_progress()
self._print_progress()
self.OMNYTools.printgreenbold("Tomoscan finished")
def _print_progress(self):
@@ -1930,9 +1943,7 @@ class Flomni(
)
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
tomo_scan_numbers_file = os.path.expanduser(
"~/data/raw/logs/tomography_scannumbers.txt"
)
tomo_scan_numbers_file = os.path.expanduser("~/data/raw/logs/tomography_scannumbers.txt")
with open(tomo_scan_numbers_file, "a+") as out_file:
# pylint: disable=undefined-variable
out_file.write(
@@ -1943,7 +1954,6 @@ class Flomni(
dev.rtx.controller.laser_tracker_check_signalstrength()
scans = builtins.__dict__.get("scans")
# additional_correction = self.align.compute_additional_correction(angle)
@@ -2033,7 +2043,6 @@ class Flomni(
)
print(f"\nSample name: {self.sample_name}\n")
if self.OMNYTools.yesno("Are these parameters correctly set for your scan?", "y"):
print("... excellent!")
else:
@@ -2181,4 +2190,4 @@ if __name__ == "__main__":
builtins.__dict__["bec"] = bec
builtins.__dict__["umv"] = umv
flomni = Flomni(bec)
flomni.start_x_ray_eye_alignment()
flomni.start_x_ray_eye_alignment()