fixes/lamni_improvements #141

Merged
holler merged 3 commits from fixes/lamni_improvements into main 2026-02-23 12:27:14 +01:00
5 changed files with 577 additions and 73 deletions

View File

@@ -6,6 +6,8 @@ from rich.console import Console
from rich.table import Table
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_put, fshclose
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
# import builtins to avoid linter errors
dev = builtins.__dict__.get("dev")
@@ -18,25 +20,32 @@ class LamNIInitError(Exception):
class LaMNIInitStagesMixin:
def __init__(self, client):
super().__init__()
self.client = client
self.OMNYTools = OMNYTools(self.client)
def lamni_init_stages(self):
user_input = input("Starting initialization of LamNI stages. OK? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Start initialization of LamNI stages. OK?"):
print("staring...")
dev.lsamrot.enabled = True
else:
return
if self.check_all_axes_of_lamni_referenced():
user_input = input("Continue anyways? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"):
print("ok then...")
else:
return
axis_id_lsamrot = dev.lsamrot._config["deviceConfig"].get("axis_Id")
if dev.lsamrot.controller.get_motor_limit_switch(axis_id_lsamrot)[1] == False:
user_input = input("The rotation stage will be moved to one limit [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("The rotation stage will be moved to one limit"):
print("starting...")
else:
return
@@ -47,10 +56,7 @@ class LaMNIInitStagesMixin:
print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True")
return
user_input = input(
"Init of loptz. Can the stage move to the upstream limit without collision?? [y/n]"
)
if user_input == "y":
if self.OMNYTools.yesno("Init of loptz. Can the stage move to the upstream limit without collision?"):
print("ok then...")
else:
return
@@ -81,8 +87,7 @@ class LaMNIInitStagesMixin:
time.sleep(0.1)
self.find_reference_mark(dev.lsamrot)
user_input = input("Init of leye. Can the stage move to -x limit without collision? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Init of leye. Can the stage move to -x limit without collision?"):
print("starting...")
else:
return
@@ -134,8 +139,7 @@ class LaMNIInitStagesMixin:
return ord(axis_id.lower()) - 97
def _align_setup(self):
user_input = input("Start moving stages to default initial positions? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Start moving stages to default initial positions?"):
print("Start moving stages...")
else:
print("Stopping.")

View File

@@ -15,13 +15,19 @@ from bec_lib.pdf_writer import PDFWriter
from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
from .lamni_optics_mixin import LaMNIInitStagesMixin, LamNIOpticsMixin
logger = bec_logger.logger
bec = builtins.__dict__.get("bec")
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
class XrayEyeAlign:
# pixel calibration, multiply to get mm
# PIXEL_CALIBRATION = 0.2/209 #.2 with binning
@@ -510,8 +516,9 @@ class LamNI(LamNIOpticsMixin):
def __init__(self, client):
super().__init__()
self.client = client
self.device_manager = client.device_manager
self.align = XrayEyeAlign(client, self)
self.init = LaMNIInitStagesMixin()
self.init = LaMNIInitStagesMixin(client)
self.check_shutter = True
self.check_light_available = True
self.check_fofb = True
@@ -524,6 +531,16 @@ class LamNI(LamNIOpticsMixin):
self._beam_is_okay = True
self._stop_beam_check_event = None
self.beam_check_thread = None
self.OMNYTools = OMNYTools(self.client)
# Progress tracking
self.progress = {}
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
self.progress["subtomo"] = 0
self.progress["subtomo_projection"] = 0
self.progress["subtomo_total_projections"] = 1
self.progress["projection"] = 0
self.progress["total_projections"] = 1
self.progress["angle"] = 0
def get_beamline_checks_enabled(self):
print(
@@ -598,12 +615,12 @@ class LamNI(LamNIOpticsMixin):
if val == 1:
# equally spaced tomography with 8 sub tomograms
self.client.set_global_var("tomo_type", val)
# elif val == 2:
# # golden ratio tomography (sorted bunches)
# self.client.set_global_var("tomo_type", val)
# elif val == 3:
# # equally spaced tomography with starting angles shifted by golden ratio
# self.client.set_global_var("tomo_type", val)
elif val == 2:
# golden ratio tomography (sorted bunches)
self.client.set_global_var("tomo_type", val)
elif val == 3:
# equally spaced tomography with starting angles shifted by golden ratio
self.client.set_global_var("tomo_type", val)
else:
raise ValueError("Unknown tomo_type.")
@@ -740,6 +757,41 @@ class LamNI(LamNIOpticsMixin):
def tomo_stitch_overlap(self, val: float):
self.client.set_global_var("tomo_stitch_overlap", val)
@property
def golden_max_number_of_projections(self):
val = self.client.get_global_var("golden_max_number_of_projections")
if val is None:
return 1000.0
return val
@golden_max_number_of_projections.setter
def golden_max_number_of_projections(self, val: float):
self.client.set_global_var("golden_max_number_of_projections", val)
@property
def golden_ratio_bunch_size(self):
val = self.client.get_global_var("golden_ratio_bunch_size")
if val is None:
return 20
return val
@golden_ratio_bunch_size.setter
def golden_ratio_bunch_size(self, val: float):
if val < 20:
raise ValueError("golden_ratio_bunch_size must be at least 20.")
self.client.set_global_var("golden_ratio_bunch_size", val)
@property
def golden_projections_at_0_deg_for_damage_estimation(self):
val = self.client.get_global_var("golden_projections_at_0_deg_for_damage_estimation")
if val is None:
return 0
return val
@golden_projections_at_0_deg_for_damage_estimation.setter
def golden_projections_at_0_deg_for_damage_estimation(self, val: float):
self.client.set_global_var("golden_projections_at_0_deg_for_damage_estimation", val)
@property
def sample_name(self):
val = self.client.get_global_var("sample_name")
@@ -909,6 +961,49 @@ class LamNI(LamNIOpticsMixin):
except Exception:
logger.warning("Failed to send update to SciLog.")
def rt_off(self):
dev.rtx.enabled = False
dev.rty.enabled = False
def rt_on(self):
dev.rtx.enabled = True
dev.rty.enabled = True
if dev.rtx.enabled == True:
print("rt is enabled")
else:
print("failed to enable rt")
def feedback_enable_with_reset(self):
self.device_manager.devices.rtx.controller.feedback_enable_with_reset()
self.feedback_status()
def feedback_enable_without_reset(self):
self.device_manager.devices.rtx.controller.feedback_enable_without_reset()
self.feedback_status()
def feedback_disable(self):
self.device_manager.devices.rtx.controller.feedback_disable()
self.feedback_status()
def feedback_disable_and_reset_angle(self):
self.device_manager.devices.rtx.controller.feedback_disable_and_even_reset_lamni_angle_interferometer()
self.feedback_status()
def feedback_status(self):
if self.device_manager.devices.rtx.controller.feedback_is_running():
print("The rt feedback is \x1b[92mrunning\x1b[0m.")
else:
print("The rt feedback is \x1b[91mNOT\x1b[0m running.")
def show_interferometer_positions(self):
self.device_manager.devices.rtx.controller.show_interferometer_positions()
def show_signal_strength(self):
self.device_manager.devices.rtx.controller.show_signal_strength_interferometer()
def show_analog_signals(self):
return self.device_manager.devices.rtx.controller.show_analog_signals()
def add_sample_database(
self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user
):
@@ -935,9 +1030,8 @@ class LamNI(LamNIOpticsMixin):
# self.tomo_scan_projection(angle)
# self.tomo_reconstruct()
def sub_tomo_scan(self, subtomo_number, start_angle=None):
"""start a subtomo"""
dev = builtins.__dict__.get("dev")
def _write_subtomo_to_scilog(self, subtomo_number):
"""Write subtomo start information to scilog."""
bec = builtins.__dict__.get("bec")
if self.tomo_id > 0:
tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"]
@@ -948,6 +1042,10 @@ class LamNI(LamNIOpticsMixin):
tags,
)
def sub_tomo_scan(self, subtomo_number, start_angle=None):
"""start a subtomo"""
self._write_subtomo_to_scilog(subtomo_number)
if start_angle is None:
if subtomo_number == 1:
start_angle = 0
@@ -980,13 +1078,14 @@ class LamNI(LamNIOpticsMixin):
if not (subtomo_number % 2):
angles = np.flip(angles)
for angle in angles:
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
self.progress["subtomo"] = subtomo_number
self.progress["subtomo_projection"] = angles.index(angle)
self.progress["subtomo_total_projections"] = 180 / self.tomo_angle_stepsize
self.progress["subtomo_projection"] = np.where(angles == angle)[0][0]
self.progress["subtomo_total_projections"] = 360 / self.tomo_angle_stepsize
self.progress["projection"] = (subtomo_number - 1) * self.progress[
"subtomo_total_projections"
] + self.progress["subtomo_projection"]
self.progress["total_projections"] = 180 / self.tomo_angle_stepsize * 8
self.progress["total_projections"] = 360 / self.tomo_angle_stepsize * 8
self.progress["angle"] = angle
self._tomo_scan_at_angle(angle, subtomo_number)
@@ -1049,6 +1148,11 @@ class LamNI(LamNIOpticsMixin):
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
if self._was_beam_okay() and not error_caught:
successful = True
else:
self._wait_for_beamline_checks()
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
tomo_scan_numbers_file = os.path.expanduser(
"~/Data10/specES1/dat-files/tomography_scannumbers.txt"
@@ -1059,13 +1163,74 @@ class LamNI(LamNIOpticsMixin):
f"{scan_number} {angle} {dev.lsamrot.read()['lsamrot']['value']:.3f} {self.tomo_id} {subtomo_number} {0} {'lamni'}\n"
)
def tomo_scan(self, subtomo_start=1, start_angle=None):
"""start a tomo scan"""
def _golden(self, ii, howmany_sorted, maxangle=360, reverse=False):
"""Return the ii-th golden ratio angle within sorted bunches of size howmany_sorted,
and its subtomo number. Operates over maxangle degrees (360 for LamNI)."""
golden = []
for iji in range(
(ii - (ii % howmany_sorted)), (ii - (ii % howmany_sorted)) + howmany_sorted, 1
):
golden.append(
((iji * maxangle * (1 + pow(5, 0.5)) / 2) * 1000 % (maxangle * 1000)) / 1000
)
golden.sort()
subtomo_number = int(ii / howmany_sorted) + 1
if reverse and not subtomo_number % 2:
golden.reverse()
return (golden[ii % howmany_sorted], subtomo_number)
def _golden_equally_spaced(self, ii, number_of_projections_per_subtomo, maxangle=360, reverse=True, verbose=False):
"""Return angles for equally spaced tomography with sub-tomogram starting angles
shifted according to the golden ratio. Operates over maxangle degrees (360 for LamNI).
ii is the projection number starting at 0."""
angular_step = maxangle / number_of_projections_per_subtomo
subtomo_number = int(((ii) * angular_step) / maxangle) + 1
start_angle = self._golden(subtomo_number - 1, 1, angular_step)[0]
projection_number_of_subtomo = (
ii - (subtomo_number - 1) * number_of_projections_per_subtomo
)
if reverse:
if subtomo_number % 2:
angle = start_angle + projection_number_of_subtomo * angular_step
else:
angle = (
start_angle
+ (number_of_projections_per_subtomo - 1) * angular_step
- projection_number_of_subtomo * angular_step
)
else:
angle = start_angle + projection_number_of_subtomo * angular_step
if verbose:
print(
f"Equally spaced golden ratio tomography.\n"
f"Angular step: {angular_step}\n"
f"Subtomo Number: {subtomo_number}\n"
f"Angle: {angle}"
)
return angle, subtomo_number
def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None):
"""Start a tomo scan.
Args:
subtomo_start (int): For tomo_type 1, the sub-tomogram number to start from. Defaults to 1.
start_angle (float, optional): Override the starting angle of the first sub-tomogram. Defaults to None.
projection_number (int, optional): For tomo_types 2 and 3, resume from this projection index. Defaults to None.
"""
bec = builtins.__dict__.get("bec")
scans = builtins.__dict__.get("scans")
self._current_special_angles = self.special_angles.copy()
if self.tomo_type == 1 and subtomo_start == 1 and start_angle is None:
# Register a new tomo scan in the database and write the PDF report
# only when starting fresh (not resuming mid-scan)
if (
(self.tomo_type == 1 and subtomo_start == 1 and start_angle is None)
or (self.tomo_type == 2 and projection_number is None)
or (self.tomo_type == 3 and projection_number is None)
):
# pylint: disable=undefined-variable
self.tomo_id = self.add_sample_database(
self.sample_name,
@@ -1077,10 +1242,108 @@ class LamNI(LamNIOpticsMixin):
"BEC",
)
self.write_pdf_report()
with scans.dataset_id_on_hold:
for ii in range(subtomo_start, 9):
self.sub_tomo_scan(ii, start_angle=start_angle)
start_angle = None
if self.tomo_type == 1:
# 8 equally spaced sub-tomograms over 360 degrees
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
for ii in range(subtomo_start, 9):
self.sub_tomo_scan(ii, start_angle=start_angle)
start_angle = None
elif self.tomo_type == 2:
# Golden ratio tomography (sorted bunches) over 360 degrees
self.progress["tomo_type"] = "Golden ratio tomography"
previous_subtomo_number = -1
ii = 0 if projection_number is None else projection_number
while True:
angle, subtomo_number = self._golden(
ii, self.golden_ratio_bunch_size, maxangle=360, reverse=True
)
if previous_subtomo_number != subtomo_number:
self._write_subtomo_to_scilog(subtomo_number)
if (
subtomo_number % 2 == 1
and ii > 10
and self.golden_projections_at_0_deg_for_damage_estimation == 1
):
self._tomo_scan_at_angle(0, subtomo_number)
previous_subtomo_number = subtomo_number
self.progress["subtomo"] = subtomo_number
self.progress["projection"] = ii
self.progress["angle"] = angle
if self.golden_ratio_bunch_size > 0:
self.progress["subtomo_total_projections"] = self.golden_ratio_bunch_size
self.progress["subtomo_projection"] = (
ii - (subtomo_number - 1) * self.golden_ratio_bunch_size
)
else:
self.progress["subtomo_total_projections"] = 0
self.progress["subtomo_projection"] = 0
self.progress["total_projections"] = (
self.golden_max_number_of_projections
if self.golden_max_number_of_projections > 0
else 0
)
self._tomo_scan_at_angle(angle, subtomo_number)
ii += 1
if (
self.golden_max_number_of_projections > 0
and ii > self.golden_max_number_of_projections
):
print(
f"Golden ratio tomography stopped automatically after the requested"
f" {self.golden_max_number_of_projections} projections."
)
break
elif self.tomo_type == 3:
# Equally spaced tomography with golden ratio starting angles over 360 degrees
self.progress["tomo_type"] = "Equally spaced, golden ratio starting angles"
previous_subtomo_number = -1
ii = 0 if projection_number is None else projection_number
while True:
angle, subtomo_number = self._golden_equally_spaced(
ii, int(360 / self.tomo_angle_stepsize), maxangle=360, reverse=True
)
if previous_subtomo_number != subtomo_number:
self._write_subtomo_to_scilog(subtomo_number)
if (
subtomo_number % 2 == 1
and ii > 10
and self.golden_projections_at_0_deg_for_damage_estimation == 1
):
self._tomo_scan_at_angle(0, subtomo_number)
previous_subtomo_number = subtomo_number
self.progress["subtomo"] = subtomo_number
self.progress["projection"] = ii
self.progress["angle"] = angle
self.progress["subtomo_total_projections"] = 360 / self.tomo_angle_stepsize
self.progress["subtomo_projection"] = (
ii - (subtomo_number - 1) * self.progress["subtomo_total_projections"]
)
self.progress["total_projections"] = (
self.golden_max_number_of_projections
if self.golden_max_number_of_projections > 0
else 0
)
self._tomo_scan_at_angle(angle, subtomo_number)
ii += 1
if (
self.golden_max_number_of_projections > 0
and ii > self.golden_max_number_of_projections
):
print(
f"Golden ratio tomography stopped automatically after the requested"
f" {self.golden_max_number_of_projections} projections."
)
break
else:
raise ValueError(f"Unknown tomo_type: {self.tomo_type}.")
def tomo_parameters(self):
"""print and update the tomo parameters"""
@@ -1103,9 +1366,37 @@ class LamNI(LamNIOpticsMixin):
print(f" _tomo_fovy_offset <mm> = {self.align.tomo_fovy_offset}")
print(f" _manual_shift_x <mm> = {self.manual_shift_x}")
print(f" _manual_shift_y <mm> = {self.manual_shift_y}")
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print(f"Resulting in number of projections: {360/self.tomo_angle_stepsize*8}")
print(f"Sample name: {self.sample_name}\n")
print("")
if self.tomo_type == 1:
print("\x1b[1mTomo type 1:\x1b[0m 8 equally spaced sub-tomograms")
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print(f"Resulting in number of projections: {360/self.tomo_angle_stepsize*8}")
elif self.tomo_type == 2:
print("\x1b[1mTomo type 2:\x1b[0m Golden ratio tomography")
print(f"Sorted in bunches of: {self.golden_ratio_bunch_size}")
if self.golden_max_number_of_projections > 0:
print(f"Ending after {self.golden_max_number_of_projections} projections.")
else:
print("Ending by manual interruption.")
if self.golden_projections_at_0_deg_for_damage_estimation == 1:
print(
"Repeating projections at 0 degrees at the beginning of every second subtomogram."
)
elif self.tomo_type == 3:
print(
"\x1b[1mTomo type 3:\x1b[0m Equally spaced tomography, golden ratio starting angle"
)
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print(f"Number of projections per sub-tomogram: {360/self.tomo_angle_stepsize}")
if self.golden_max_number_of_projections > 0:
print(f"Ending after {self.golden_max_number_of_projections} projections.")
else:
print("Ending by manual interruption.")
if self.golden_projections_at_0_deg_for_damage_estimation == 1:
print(
"Repeating projections at 0 degrees at the beginning of every second subtomogram."
)
print(f"\nSample name: {self.sample_name}\n")
user_input = input("Are these parameters correctly set for your scan? ")
if user_input == "y":
@@ -1125,13 +1416,61 @@ class LamNI(LamNIOpticsMixin):
self.ptycho_reconstruct_foldername = self._get_val(
"Reconstruction queue ", self.ptycho_reconstruct_foldername, str
)
tomo_numberofprojections = self._get_val(
"Number of projections", 360 / self.tomo_angle_stepsize * 8, int
)
print(f"The angular step will be {360/tomo_numberofprojections}")
self.tomo_angle_stepsize = 360 / tomo_numberofprojections * 8
print(f"The angular step in a subtomogram it will be {self.tomo_angle_stepsize}")
print("Tomography type:")
print(" 1: 8 equally spaced sub-tomograms (360 deg)")
print(" 2: Golden ratio tomography")
print(" 3: Equally spaced tomography, golden ratio starting angle")
self.tomo_type = self._get_val("Tomography type", self.tomo_type, int)
if self.tomo_type == 1:
tomo_numberofprojections = self._get_val(
"Number of projections", 360 / self.tomo_angle_stepsize * 8, int
)
print(f"The angular step will be {360/tomo_numberofprojections}")
self.tomo_angle_stepsize = 360 / tomo_numberofprojections * 8
print(f"The angular step in a subtomogram it will be {self.tomo_angle_stepsize}")
elif self.tomo_type == 2:
while True:
bunch_size = self._get_val(
"Number of projections sorted per bunch (minimum 20)",
self.golden_ratio_bunch_size,
int,
)
if bunch_size >= 20:
self.golden_ratio_bunch_size = bunch_size
break
print("Bunch size must be at least 20. Please try again.")
self.golden_max_number_of_projections = self._get_val(
"Stop after number of projections (0 for endless)",
self.golden_max_number_of_projections,
int,
)
self.golden_projections_at_0_deg_for_damage_estimation = self._get_val(
"Repeat projections at 0 deg every second subtomo 1/0?",
self.golden_projections_at_0_deg_for_damage_estimation,
int,
)
elif self.tomo_type == 3:
numprj = self._get_val(
"Number of projections per sub-tomogram",
int(360 / self.tomo_angle_stepsize),
int,
)
self.tomo_angle_stepsize = 360 / numprj
self.golden_max_number_of_projections = self._get_val(
"Stop after number of projections (0 for endless)",
self.golden_max_number_of_projections,
int,
)
self.golden_projections_at_0_deg_for_damage_estimation = self._get_val(
"Repeat projections at 0 deg every second subtomo 1/0?",
self.golden_projections_at_0_deg_for_damage_estimation,
int,
)
self.sample_name = self._get_val("sample name", self.sample_name, str)
@staticmethod
@@ -1197,6 +1536,7 @@ class LamNI(LamNIOpticsMixin):
(
f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n"
),
f"{'Tomo type:':<{padding}}{self.tomo_type:>{padding}}\n",
]
content = "".join(content)
user_target = os.path.expanduser(f"~/Data10/documentation/tomo_scan_ID_{self.tomo_id}.pdf")
@@ -1389,3 +1729,4 @@ class DataDrivenLamNI(LamNI):
shapes.append(data.shape)
if len(set(shapes)) > 1:
raise ValueError(f"Tomo data file has entries of inconsistent lengths: {shapes}.")

View File

@@ -35,32 +35,32 @@ class FlomniInitError(Exception):
class FlomniError(Exception):
pass
class FlomniTools:
def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
if autoconfirm and default == "y":
self.printgreen(message + " Automatically confirming default: yes")
return True
elif autoconfirm and default == "n":
self.printgreen(message + " Automatically confirming default: no")
return False
if default == "y":
message_ending = " [Y]/n? "
elif default == "n":
message_ending = " y/[N]? "
else:
message_ending = " y/n? "
while True:
user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
if (
user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
) or (default == "y" and user_input == ""):
return True
if (
user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
) or (default == "n" and user_input == ""):
return False
else:
print("Please expicitely confirm y or n.")
# class FlomniTools:
# def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
# if autoconfirm and default == "y":
# self.printgreen(message + " Automatically confirming default: yes")
# return True
# elif autoconfirm and default == "n":
# self.printgreen(message + " Automatically confirming default: no")
# return False
# if default == "y":
# message_ending = " [Y]/n? "
# elif default == "n":
# message_ending = " y/[N]? "
# else:
# message_ending = " y/n? "
# while True:
# user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
# if (
# user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
# ) or (default == "y" and user_input == ""):
# return True
# if (
# user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
# ) or (default == "n" and user_input == ""):
# return False
# else:
# print("Please expicitely confirm y or n.")

View File

@@ -25,6 +25,34 @@ logger = bec_logger.logger
class LamniGalilController(GalilController):
# ============================================================
# Error status
# ============================================================
caperr_bits = {
0x01: "Cap1 outside expected left-stop range (early check)",
0x02: "Cap2 outside expected left-stop range (early check)",
0x04: "Cap1 too low during pressure-off check (near right boundary)",
0x08: "Cap2 too low during pressure-off check (near right boundary)",
0x10: "Cap1 exceeded allowed left-stop boundary during movement",
0x20: "Cap2 exceeded allowed left-stop boundary during movement (disabled in code)",
0x40: "Cap1 did not respond to test movement",
0x80: "Cap2 did not respond to test movement"
}
allaxrer_table = {
1: "Not all axes referenced after reference search",
2: "Pressure-loss emergency stop (pressure 14/15 active while motor C off)",
3: "Unexpected pressure OFF while soft-limits not yet set",
4: "Pressure valve mismatch (OUT13=0 but IN13=1)",
5: "Capacitive sensor boundary violations (caperr > 0)",
6: "Emergency Stop triggered (IN[5]=0)",
7: "Following error detected on one or more axes"
}
USER_ACCESS = [
"describe",
"show_running_threads",
@@ -37,6 +65,8 @@ class LamniGalilController(GalilController):
"get_motor_limit_switch",
"is_motor_on",
"all_axes_referenced",
"lamni_lights_off",
"lamni_lights_on"
]
def show_status_other(self):
@@ -60,6 +90,47 @@ class LamniGalilController(GalilController):
print("There is air pressure at the outer rotation radial.")
swver = float(self.socket_put_and_receive("MGswver"))
print(f"Lgalil LAMNI firmware version {swver:2.0f}.")
allaxref = int(float(self.socket_put_and_receive("MGallaxref")))
print(f"Error statuts:")
if allaxref == 1:
print(f"Allaxref = 1, all OK.")
else:
print(f"Allaxref = {allaxref}. Not all axes are referenced or error introduced preventing motion.")
allaxrer = int(float(self.socket_put_and_receive("MGallaxrer")))
print("\nallaxrer =", allaxrer)
print(self.decode_allaxrer(allaxrer))
caperr = int(float(self.socket_put_and_receive("MGcaperr")))
print("\nDecoding caperr =", caperr)
self.visualize_caperr(caperr)
def decode_allaxrer(self, code: int) -> str:
"""Return human-readable meaning of allaxrer code."""
return self.allaxrer_table.get(code, "Unknown allaxrer code")
def visualize_caperr(self, mask: int):
"""Pretty-print a bitmask visualization for caperr."""
print("\n=== CAPERR BITMASK VISUALIZER ===")
print(f"Raw value: {mask} (0x{mask:02X})")
print("----------------------------------\n")
print("Bit | Hex | Active | Meaning")
print("----------------------------------")
for bit, meaning in self.caperr_bits.items():
active = "YES" if mask & bit else "no"
print(f"{bit:3d} | 0x{bit:02X} | {active:6} | {meaning}")
print("\nActive flags:")
active_flags = [meaning for bit, meaning in self.caperr_bits.items() if mask & bit]
if active_flags:
for f in active_flags:
print("", f)
else:
print(" (none)")
print("\n==================================\n")
def lamni_lights_off(self):
self.socket_put_confirmed("SB1")
@@ -93,7 +164,7 @@ class LamniGalilReadbackSignal(GalilSignalRO):
val = super().read()
if self.parent.axis_Id_numeric == 2:
try:
rt = self.parent.device_manager.devices[self.parent.rtx]
rt = self.parent.device_manager.devices[self.parent.rt]
if rt.enabled:
rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"])
except KeyError:
@@ -147,7 +218,7 @@ class LamniGalilMotor(Device, PositionerBase):
raise BECConfigError(
"device_mapping has been specified but the device_manager cannot be accessed."
)
self.rt = self.device_mapping.get("rt")
self.rt = self.device_mapping.get("rt", "rtx")
super().__init__(
prefix,

View File

@@ -11,6 +11,7 @@ from ophyd.status import wait as status_wait
from ophyd.utils import LimitError, ReadOnlyError
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
from prettytable import PrettyTable
from csaxs_bec.devices.omny.rt.rt_ophyd import RtCommunicationError, RtError
@@ -51,6 +52,7 @@ class RtLamniController(Controller):
_axes_per_controller = 3
USER_ACCESS = [
"socket_put_and_receive",
"socket_put",
"set_rotation_angle",
"feedback_disable",
"feedback_enable_without_reset",
@@ -62,6 +64,11 @@ class RtLamniController(Controller):
"_set_axis_velocity_maximum_speed",
"_position_sampling_single_read",
"_position_sampling_single_reset_and_start_sampling",
"show_signal_strength_interferometer",
"show_interferometer_positions",
"show_analog_signals",
"show_feedback_status",
]
def __init__(
@@ -208,8 +215,9 @@ class RtLamniController(Controller):
@threadlocked
def start_scan(self):
interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0])
if interferometer_feedback_not_running == 1:
# interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0])
# if interferometer_feedback_not_running == 1:
if not self.feedback_is_running():
logger.error(
"Cannot start scan because feedback loop is not running or there is an interferometer error."
)
@@ -270,6 +278,44 @@ class RtLamniController(Controller):
"average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)},
}
return signals
def feedback_is_running(self) -> bool:
status = int(float((self.socket_put_and_receive("J2")).split(",")[0]))
return status == 0 # 0 means running, 1 means error/disabled
def show_feedback_status(self):
if self.feedback_is_running():
print("Loop is running, no error on interferometer.")
else:
print("Loop is not running, either it is turned off or an interferometer error occurred.")
def show_analog_signals(self) -> dict:
self.socket_put("As") # start sampling
time.sleep(0.01)
return_table = (self.socket_put_and_receive("Ar")).split(",")
number_of_samples = int(float(return_table[0]))
signals = {
"number_of_samples": number_of_samples,
"piezo_0": float(return_table[1]),
"piezo_1": float(return_table[2]),
"cap_0": float(return_table[3]),
"cap_1": float(return_table[4]),
"cap_2": float(return_table[5]),
"cap_3": float(return_table[6]),
"cap_4": float(return_table[7]),
}
t = PrettyTable()
t.title = f"LamNI Analog Signals ({number_of_samples} samples)"
t.field_names = ["Signal", "Value"]
for key, val in signals.items():
if key != "number_of_samples":
t.add_row([key, f"{val:.4f}"])
print(t)
return
def read_positions_from_sampler(self):
# this was for reading after the scan completed
@@ -347,6 +393,48 @@ class RtLamniController(Controller):
)
return bool(return_table[0])
def show_signal_strength_interferometer(self):
# trigger SSI averaging before reading
self.socket_put("J3")
time.sleep(0.05)
return_table = (self.socket_put_and_receive("J2")).split(",")
ssi_0 = float(return_table[1])
ssi_1 = float(return_table[2])
return_table_angle = (self.socket_put_and_receive("J7")).split(",")
angle_running = bool(int(float(return_table_angle[0])))
angle_position = float(return_table_angle[1])
angle_signal = float(return_table_angle[2])
t = PrettyTable()
t.title = "Interferometer signal strength"
t.field_names = ["Axis", "Description", "Value", "Running"]
t.add_row([0, "ST FZP horizontal", ssi_0, "-"])
t.add_row([1, "ST FZP vertical", ssi_1, "-"])
t.add_row([2, "Angle interferometer", angle_signal, angle_running])
print(t)
if angle_running:
print(f"Angle interferometer position: {angle_position:.4f} um")
else:
print("Warning: angle interferometer is not running.")
def show_interferometer_positions(self) -> dict:
return_table = (self.socket_put_and_receive("J4")).split(",")
loop_status = bool(int(float(return_table[0])))
pos_y = float(return_table[1])
pos_x = float(return_table[2])
t = PrettyTable()
t.title = "LamNI Interferometer Positions"
t.field_names = ["Axis", "Description", "Position (um)"]
t.add_row([0, "X", f"{pos_x:.4f}"])
t.add_row([1, "Y", f"{pos_y:.4f}"])
print(t)
print(f"Feedback loop running: {loop_status}")
return {"x": pos_x, "y": pos_y, "loop_running": loop_status}
def feedback_enable_with_reset(self):
if not self.feedback_status_angle_lamni():
self.feedback_disable_and_even_reset_lamni_angle_interferometer()