From 6f60bd4b2ba9eb05f398c419e42be3968db54f4a Mon Sep 17 00:00:00 2001 From: x01dc Date: Fri, 20 Feb 2026 14:39:22 +0100 Subject: [PATCH 1/3] first commit, getting lamni running and adding some missing features in controller rt and galil --- .../plugins/LamNI/lamni_optics_mixin.py | 32 ++++--- .../plugins/LamNI/x_ray_eye_align.py | 4 +- .../plugins/flomni/flomni.py | 52 +++++------ csaxs_bec/devices/omny/galil/lgalil_ophyd.py | 75 ++++++++++++++- csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py | 92 ++++++++++++++++++- 5 files changed, 210 insertions(+), 45 deletions(-) diff --git a/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni_optics_mixin.py b/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni_optics_mixin.py index 29d17eb..f70385c 100644 --- a/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni_optics_mixin.py +++ b/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni_optics_mixin.py @@ -6,6 +6,8 @@ from rich.console import Console from rich.table import Table from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_put, fshclose +from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools + # import builtins to avoid linter errors dev = builtins.__dict__.get("dev") @@ -18,25 +20,32 @@ class LamNIInitError(Exception): class LaMNIInitStagesMixin: + def __init__(self, client): + super().__init__() + self.client = client + self.OMNYTools = OMNYTools(self.client) + def lamni_init_stages(self): - user_input = input("Starting initialization of LamNI stages. OK? [y/n]") - if user_input == "y": + + if self.OMNYTools.yesno("Start initialization of LamNI stages. OK?"): print("staring...") dev.lsamrot.enabled = True else: return + + + if self.check_all_axes_of_lamni_referenced(): - user_input = input("Continue anyways? [y/n]") - if user_input == "y": + if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"): print("ok then...") else: return axis_id_lsamrot = dev.lsamrot._config["deviceConfig"].get("axis_Id") if dev.lsamrot.controller.get_motor_limit_switch(axis_id_lsamrot)[1] == False: - user_input = input("The rotation stage will be moved to one limit [y/n]") - if user_input == "y": + + if self.OMNYTools.yesno("The rotation stage will be moved to one limit"): print("starting...") else: return @@ -47,10 +56,7 @@ class LaMNIInitStagesMixin: print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True") return - user_input = input( - "Init of loptz. Can the stage move to the upstream limit without collision?? [y/n]" - ) - if user_input == "y": + if self.OMNYTools.yesno("Init of loptz. Can the stage move to the upstream limit without collision?"): print("ok then...") else: return @@ -81,8 +87,7 @@ class LaMNIInitStagesMixin: time.sleep(0.1) self.find_reference_mark(dev.lsamrot) - user_input = input("Init of leye. Can the stage move to -x limit without collision? [y/n]") - if user_input == "y": + if self.OMNYTools.yesno("Init of leye. Can the stage move to -x limit without collision?"): print("starting...") else: return @@ -134,8 +139,7 @@ class LaMNIInitStagesMixin: return ord(axis_id.lower()) - 97 def _align_setup(self): - user_input = input("Start moving stages to default initial positions? [y/n]") - if user_input == "y": + if self.OMNYTools.yesno("Start moving stages to default initial positions?"): print("Start moving stages...") else: print("Stopping.") diff --git a/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py b/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py index 2b39022..b446e4c 100644 --- a/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py +++ b/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py @@ -15,6 +15,7 @@ from bec_lib.pdf_writer import PDFWriter from typeguard import typechecked from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen +from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools from .lamni_optics_mixin import LaMNIInitStagesMixin, LamNIOpticsMixin @@ -511,7 +512,7 @@ class LamNI(LamNIOpticsMixin): super().__init__() self.client = client self.align = XrayEyeAlign(client, self) - self.init = LaMNIInitStagesMixin() + self.init = LaMNIInitStagesMixin(client) self.check_shutter = True self.check_light_available = True self.check_fofb = True @@ -524,6 +525,7 @@ class LamNI(LamNIOpticsMixin): self._beam_is_okay = True self._stop_beam_check_event = None self.beam_check_thread = None + self.OMNYTools = OMNYTools(self.client) def get_beamline_checks_enabled(self): print( diff --git a/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py b/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py index 87f3680..806ee04 100644 --- a/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py +++ b/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py @@ -35,32 +35,32 @@ class FlomniInitError(Exception): class FlomniError(Exception): pass -class FlomniTools: - def yesno(self, message: str, default="none", autoconfirm=0) -> bool: - if autoconfirm and default == "y": - self.printgreen(message + " Automatically confirming default: yes") - return True - elif autoconfirm and default == "n": - self.printgreen(message + " Automatically confirming default: no") - return False - if default == "y": - message_ending = " [Y]/n? " - elif default == "n": - message_ending = " y/[N]? " - else: - message_ending = " y/n? " - while True: - user_input = input(self.OKBLUE + message + message_ending + self.ENDC) - if ( - user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes" - ) or (default == "y" and user_input == ""): - return True - if ( - user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No" - ) or (default == "n" and user_input == ""): - return False - else: - print("Please expicitely confirm y or n.") +# class FlomniTools: +# def yesno(self, message: str, default="none", autoconfirm=0) -> bool: +# if autoconfirm and default == "y": +# self.printgreen(message + " Automatically confirming default: yes") +# return True +# elif autoconfirm and default == "n": +# self.printgreen(message + " Automatically confirming default: no") +# return False +# if default == "y": +# message_ending = " [Y]/n? " +# elif default == "n": +# message_ending = " y/[N]? " +# else: +# message_ending = " y/n? " +# while True: +# user_input = input(self.OKBLUE + message + message_ending + self.ENDC) +# if ( +# user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes" +# ) or (default == "y" and user_input == ""): +# return True +# if ( +# user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No" +# ) or (default == "n" and user_input == ""): +# return False +# else: +# print("Please expicitely confirm y or n.") diff --git a/csaxs_bec/devices/omny/galil/lgalil_ophyd.py b/csaxs_bec/devices/omny/galil/lgalil_ophyd.py index 87d3c19..31757fa 100644 --- a/csaxs_bec/devices/omny/galil/lgalil_ophyd.py +++ b/csaxs_bec/devices/omny/galil/lgalil_ophyd.py @@ -25,6 +25,34 @@ logger = bec_logger.logger class LamniGalilController(GalilController): + + + # ============================================================ + # Error status + # ============================================================ + + caperr_bits = { + 0x01: "Cap1 outside expected left-stop range (early check)", + 0x02: "Cap2 outside expected left-stop range (early check)", + 0x04: "Cap1 too low during pressure-off check (near right boundary)", + 0x08: "Cap2 too low during pressure-off check (near right boundary)", + 0x10: "Cap1 exceeded allowed left-stop boundary during movement", + 0x20: "Cap2 exceeded allowed left-stop boundary during movement (disabled in code)", + 0x40: "Cap1 did not respond to test movement", + 0x80: "Cap2 did not respond to test movement" + } + + allaxrer_table = { + 1: "Not all axes referenced after reference search", + 2: "Pressure-loss emergency stop (pressure 14/15 active while motor C off)", + 3: "Unexpected pressure OFF while soft-limits not yet set", + 4: "Pressure valve mismatch (OUT13=0 but IN13=1)", + 5: "Capacitive sensor boundary violations (caperr > 0)", + 6: "Emergency Stop triggered (IN[5]=0)", + 7: "Following error detected on one or more axes" + } + + USER_ACCESS = [ "describe", "show_running_threads", @@ -37,6 +65,8 @@ class LamniGalilController(GalilController): "get_motor_limit_switch", "is_motor_on", "all_axes_referenced", + "lamni_lights_off", + "lamni_lights_on" ] def show_status_other(self): @@ -60,6 +90,47 @@ class LamniGalilController(GalilController): print("There is air pressure at the outer rotation radial.") swver = float(self.socket_put_and_receive("MGswver")) print(f"Lgalil LAMNI firmware version {swver:2.0f}.") + allaxref = int(float(self.socket_put_and_receive("MGallaxref"))) + print(f"Error statuts:") + if allaxref == 1: + print(f"Allaxref = 1, all OK.") + else: + print(f"Allaxref = {allaxref}. Not all axes are referenced or error introduced preventing motion.") + allaxrer = int(float(self.socket_put_and_receive("MGallaxrer"))) + print("\nallaxrer =", allaxrer) + print(self.decode_allaxrer(allaxrer)) + caperr = int(float(self.socket_put_and_receive("MGcaperr"))) + print("\nDecoding caperr =", caperr) + self.visualize_caperr(caperr) + + def decode_allaxrer(self, code: int) -> str: + """Return human-readable meaning of allaxrer code.""" + return self.allaxrer_table.get(code, "Unknown allaxrer code") + + def visualize_caperr(self, mask: int): + """Pretty-print a bitmask visualization for caperr.""" + print("\n=== CAPERR BITMASK VISUALIZER ===") + print(f"Raw value: {mask} (0x{mask:02X})") + print("----------------------------------\n") + + print("Bit | Hex | Active | Meaning") + print("----------------------------------") + + for bit, meaning in self.caperr_bits.items(): + active = "YES" if mask & bit else "no" + print(f"{bit:3d} | 0x{bit:02X} | {active:6} | {meaning}") + + print("\nActive flags:") + active_flags = [meaning for bit, meaning in self.caperr_bits.items() if mask & bit] + + if active_flags: + for f in active_flags: + print(" ✓", f) + else: + print(" (none)") + + print("\n==================================\n") + def lamni_lights_off(self): self.socket_put_confirmed("SB1") @@ -93,7 +164,7 @@ class LamniGalilReadbackSignal(GalilSignalRO): val = super().read() if self.parent.axis_Id_numeric == 2: try: - rt = self.parent.device_manager.devices[self.parent.rtx] + rt = self.parent.device_manager.devices[self.parent.rt] if rt.enabled: rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"]) except KeyError: @@ -147,7 +218,7 @@ class LamniGalilMotor(Device, PositionerBase): raise BECConfigError( "device_mapping has been specified but the device_manager cannot be accessed." ) - self.rt = self.device_mapping.get("rt") + self.rt = self.device_mapping.get("rt", "rtx") super().__init__( prefix, diff --git a/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py b/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py index 8e65acc..b529ba9 100644 --- a/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py +++ b/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py @@ -11,6 +11,7 @@ from ophyd.status import wait as status_wait from ophyd.utils import LimitError, ReadOnlyError from ophyd_devices.utils.controller import Controller, threadlocked from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected +from prettytable import PrettyTable from csaxs_bec.devices.omny.rt.rt_ophyd import RtCommunicationError, RtError @@ -51,6 +52,7 @@ class RtLamniController(Controller): _axes_per_controller = 3 USER_ACCESS = [ "socket_put_and_receive", + "socket_put", "set_rotation_angle", "feedback_disable", "feedback_enable_without_reset", @@ -62,6 +64,11 @@ class RtLamniController(Controller): "_set_axis_velocity_maximum_speed", "_position_sampling_single_read", "_position_sampling_single_reset_and_start_sampling", + "show_signal_strength_interferometer", + "show_interferometer_positions", + "show_analog_signals", + "show_feedback_status", + ] def __init__( @@ -208,8 +215,9 @@ class RtLamniController(Controller): @threadlocked def start_scan(self): - interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0]) - if interferometer_feedback_not_running == 1: + # interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0]) + # if interferometer_feedback_not_running == 1: + if not self.feedback_is_running(): logger.error( "Cannot start scan because feedback loop is not running or there is an interferometer error." ) @@ -270,6 +278,44 @@ class RtLamniController(Controller): "average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)}, } return signals + + def feedback_is_running(self) -> bool: + status = int(float((self.socket_put_and_receive("J2")).split(",")[0])) + return status == 0 # 0 means running, 1 means error/disabled + + def show_feedback_status(self): + if self.feedback_is_running(): + print("Loop is running, no error on interferometer.") + else: + print("Loop is not running, either it is turned off or an interferometer error occurred.") + + + def show_analog_signals(self) -> dict: + self.socket_put("As") # start sampling + time.sleep(0.01) + return_table = (self.socket_put_and_receive("Ar")).split(",") + + number_of_samples = int(float(return_table[0])) + signals = { + "number_of_samples": number_of_samples, + "piezo_0": float(return_table[1]), + "piezo_1": float(return_table[2]), + "cap_0": float(return_table[3]), + "cap_1": float(return_table[4]), + "cap_2": float(return_table[5]), + "cap_3": float(return_table[6]), + "cap_4": float(return_table[7]), + } + + t = PrettyTable() + t.title = f"LamNI Analog Signals ({number_of_samples} samples)" + t.field_names = ["Signal", "Value"] + for key, val in signals.items(): + if key != "number_of_samples": + t.add_row([key, f"{val:.4f}"]) + print(t) + + return signals def read_positions_from_sampler(self): # this was for reading after the scan completed @@ -347,6 +393,48 @@ class RtLamniController(Controller): ) return bool(return_table[0]) + def show_signal_strength_interferometer(self): + # trigger SSI averaging before reading + self.socket_put("J3") + time.sleep(0.05) + return_table = (self.socket_put_and_receive("J2")).split(",") + ssi_0 = float(return_table[1]) + ssi_1 = float(return_table[2]) + + return_table_angle = (self.socket_put_and_receive("J7")).split(",") + angle_running = bool(int(float(return_table_angle[0]))) + angle_position = float(return_table_angle[1]) + angle_signal = float(return_table_angle[2]) + + t = PrettyTable() + t.title = "Interferometer signal strength" + t.field_names = ["Axis", "Description", "Value", "Running"] + t.add_row([0, "ST FZP horizontal", ssi_0, "-"]) + t.add_row([1, "ST FZP vertical", ssi_1, "-"]) + t.add_row([2, "Angle interferometer", angle_signal, angle_running]) + print(t) + + if angle_running: + print(f"Angle interferometer position: {angle_position:.4f} um") + else: + print("Warning: angle interferometer is not running.") + + def show_interferometer_positions(self) -> dict: + return_table = (self.socket_put_and_receive("J4")).split(",") + loop_status = bool(int(float(return_table[0]))) + pos_y = float(return_table[1]) + pos_x = float(return_table[2]) + + t = PrettyTable() + t.title = "LamNI Interferometer Positions" + t.field_names = ["Axis", "Description", "Position (um)"] + t.add_row([0, "X", f"{pos_x:.4f}"]) + t.add_row([1, "Y", f"{pos_y:.4f}"]) + print(t) + print(f"Feedback loop running: {loop_status}") + + return {"x": pos_x, "y": pos_y, "loop_running": loop_status} + def feedback_enable_with_reset(self): if not self.feedback_status_angle_lamni(): self.feedback_disable_and_even_reset_lamni_angle_interferometer() -- 2.49.1 From 48df15f35c72654158bd974d5165480561420287 Mon Sep 17 00:00:00 2001 From: x01dc Date: Sat, 21 Feb 2026 11:38:34 +0100 Subject: [PATCH 2/3] added rt controller commands to lamni namespace --- .../plugins/LamNI/x_ray_eye_align.py | 51 ++++++++++++++++++- csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py | 2 +- 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py b/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py index b446e4c..e8c12e2 100644 --- a/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py +++ b/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py @@ -20,9 +20,14 @@ from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYToo from .lamni_optics_mixin import LaMNIInitStagesMixin, LamNIOpticsMixin logger = bec_logger.logger -bec = builtins.__dict__.get("bec") +if builtins.__dict__.get("bec") is not None: + bec = builtins.__dict__.get("bec") + dev = builtins.__dict__.get("dev") + umv = builtins.__dict__.get("umv") + umvr = builtins.__dict__.get("umvr") + class XrayEyeAlign: # pixel calibration, multiply to get mm # PIXEL_CALIBRATION = 0.2/209 #.2 with binning @@ -511,6 +516,7 @@ class LamNI(LamNIOpticsMixin): def __init__(self, client): super().__init__() self.client = client + self.device_manager = client.device_manager self.align = XrayEyeAlign(client, self) self.init = LaMNIInitStagesMixin(client) self.check_shutter = True @@ -911,6 +917,49 @@ class LamNI(LamNIOpticsMixin): except Exception: logger.warning("Failed to send update to SciLog.") + def rt_off(self): + dev.rtx.enabled = False + dev.rty.enabled = False + + def rt_on(self): + dev.rtx.enabled = True + dev.rty.enabled = True + if dev.rtx.enabled == True: + print("rt is enabled") + else: + print("failed to enable rt") + + def feedback_enable_with_reset(self): + self.device_manager.devices.rtx.controller.feedback_enable_with_reset() + self.feedback_status() + + def feedback_enable_without_reset(self): + self.device_manager.devices.rtx.controller.feedback_enable_without_reset() + self.feedback_status() + + def feedback_disable(self): + self.device_manager.devices.rtx.controller.feedback_disable() + self.feedback_status() + + def feedback_disable_and_reset_angle(self): + self.device_manager.devices.rtx.controller.feedback_disable_and_even_reset_lamni_angle_interferometer() + self.feedback_status() + + def feedback_status(self): + if self.device_manager.devices.rtx.controller.feedback_is_running(): + print("The rt feedback is \x1b[92mrunning\x1b[0m.") + else: + print("The rt feedback is \x1b[91mNOT\x1b[0m running.") + + def show_interferometer_positions(self): + self.device_manager.devices.rtx.controller.show_interferometer_positions() + + def show_signal_strength(self): + self.device_manager.devices.rtx.controller.show_signal_strength_interferometer() + + def show_analog_signals(self): + return self.device_manager.devices.rtx.controller.show_analog_signals() + def add_sample_database( self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user ): diff --git a/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py b/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py index b529ba9..15a4d93 100644 --- a/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py +++ b/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py @@ -315,7 +315,7 @@ class RtLamniController(Controller): t.add_row([key, f"{val:.4f}"]) print(t) - return signals + return def read_positions_from_sampler(self): # this was for reading after the scan completed -- 2.49.1 From 54f1f42332633668ff6ae114303617214b26a9d3 Mon Sep 17 00:00:00 2001 From: x01dc Date: Sat, 21 Feb 2026 11:52:32 +0100 Subject: [PATCH 3/3] added tomo type 2 and 3, for golden ratio --- .../plugins/LamNI/x_ray_eye_align.py | 344 ++++++++++++++++-- 1 file changed, 317 insertions(+), 27 deletions(-) diff --git a/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py b/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py index e8c12e2..88c0957 100644 --- a/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py +++ b/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py @@ -532,6 +532,15 @@ class LamNI(LamNIOpticsMixin): self._stop_beam_check_event = None self.beam_check_thread = None self.OMNYTools = OMNYTools(self.client) + # Progress tracking + self.progress = {} + self.progress["tomo_type"] = "Equally spaced sub-tomograms" + self.progress["subtomo"] = 0 + self.progress["subtomo_projection"] = 0 + self.progress["subtomo_total_projections"] = 1 + self.progress["projection"] = 0 + self.progress["total_projections"] = 1 + self.progress["angle"] = 0 def get_beamline_checks_enabled(self): print( @@ -606,12 +615,12 @@ class LamNI(LamNIOpticsMixin): if val == 1: # equally spaced tomography with 8 sub tomograms self.client.set_global_var("tomo_type", val) - # elif val == 2: - # # golden ratio tomography (sorted bunches) - # self.client.set_global_var("tomo_type", val) - # elif val == 3: - # # equally spaced tomography with starting angles shifted by golden ratio - # self.client.set_global_var("tomo_type", val) + elif val == 2: + # golden ratio tomography (sorted bunches) + self.client.set_global_var("tomo_type", val) + elif val == 3: + # equally spaced tomography with starting angles shifted by golden ratio + self.client.set_global_var("tomo_type", val) else: raise ValueError("Unknown tomo_type.") @@ -748,6 +757,41 @@ class LamNI(LamNIOpticsMixin): def tomo_stitch_overlap(self, val: float): self.client.set_global_var("tomo_stitch_overlap", val) + @property + def golden_max_number_of_projections(self): + val = self.client.get_global_var("golden_max_number_of_projections") + if val is None: + return 1000.0 + return val + + @golden_max_number_of_projections.setter + def golden_max_number_of_projections(self, val: float): + self.client.set_global_var("golden_max_number_of_projections", val) + + @property + def golden_ratio_bunch_size(self): + val = self.client.get_global_var("golden_ratio_bunch_size") + if val is None: + return 20 + return val + + @golden_ratio_bunch_size.setter + def golden_ratio_bunch_size(self, val: float): + if val < 20: + raise ValueError("golden_ratio_bunch_size must be at least 20.") + self.client.set_global_var("golden_ratio_bunch_size", val) + + @property + def golden_projections_at_0_deg_for_damage_estimation(self): + val = self.client.get_global_var("golden_projections_at_0_deg_for_damage_estimation") + if val is None: + return 0 + return val + + @golden_projections_at_0_deg_for_damage_estimation.setter + def golden_projections_at_0_deg_for_damage_estimation(self, val: float): + self.client.set_global_var("golden_projections_at_0_deg_for_damage_estimation", val) + @property def sample_name(self): val = self.client.get_global_var("sample_name") @@ -986,9 +1030,8 @@ class LamNI(LamNIOpticsMixin): # self.tomo_scan_projection(angle) # self.tomo_reconstruct() - def sub_tomo_scan(self, subtomo_number, start_angle=None): - """start a subtomo""" - dev = builtins.__dict__.get("dev") + def _write_subtomo_to_scilog(self, subtomo_number): + """Write subtomo start information to scilog.""" bec = builtins.__dict__.get("bec") if self.tomo_id > 0: tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"] @@ -999,6 +1042,10 @@ class LamNI(LamNIOpticsMixin): tags, ) + def sub_tomo_scan(self, subtomo_number, start_angle=None): + """start a subtomo""" + self._write_subtomo_to_scilog(subtomo_number) + if start_angle is None: if subtomo_number == 1: start_angle = 0 @@ -1031,13 +1078,14 @@ class LamNI(LamNIOpticsMixin): if not (subtomo_number % 2): angles = np.flip(angles) for angle in angles: + self.progress["tomo_type"] = "Equally spaced sub-tomograms" self.progress["subtomo"] = subtomo_number - self.progress["subtomo_projection"] = angles.index(angle) - self.progress["subtomo_total_projections"] = 180 / self.tomo_angle_stepsize + self.progress["subtomo_projection"] = np.where(angles == angle)[0][0] + self.progress["subtomo_total_projections"] = 360 / self.tomo_angle_stepsize self.progress["projection"] = (subtomo_number - 1) * self.progress[ "subtomo_total_projections" ] + self.progress["subtomo_projection"] - self.progress["total_projections"] = 180 / self.tomo_angle_stepsize * 8 + self.progress["total_projections"] = 360 / self.tomo_angle_stepsize * 8 self.progress["angle"] = angle self._tomo_scan_at_angle(angle, subtomo_number) @@ -1100,6 +1148,11 @@ class LamNI(LamNIOpticsMixin): for scan_nr in range(start_scan_number, end_scan_number): self._write_tomo_scan_number(scan_nr, angle, subtomo_number) + if self._was_beam_okay() and not error_caught: + successful = True + else: + self._wait_for_beamline_checks() + def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None: tomo_scan_numbers_file = os.path.expanduser( "~/Data10/specES1/dat-files/tomography_scannumbers.txt" @@ -1110,13 +1163,74 @@ class LamNI(LamNIOpticsMixin): f"{scan_number} {angle} {dev.lsamrot.read()['lsamrot']['value']:.3f} {self.tomo_id} {subtomo_number} {0} {'lamni'}\n" ) - def tomo_scan(self, subtomo_start=1, start_angle=None): - """start a tomo scan""" + def _golden(self, ii, howmany_sorted, maxangle=360, reverse=False): + """Return the ii-th golden ratio angle within sorted bunches of size howmany_sorted, + and its subtomo number. Operates over maxangle degrees (360 for LamNI).""" + golden = [] + for iji in range( + (ii - (ii % howmany_sorted)), (ii - (ii % howmany_sorted)) + howmany_sorted, 1 + ): + golden.append( + ((iji * maxangle * (1 + pow(5, 0.5)) / 2) * 1000 % (maxangle * 1000)) / 1000 + ) + golden.sort() + subtomo_number = int(ii / howmany_sorted) + 1 + if reverse and not subtomo_number % 2: + golden.reverse() + return (golden[ii % howmany_sorted], subtomo_number) + + def _golden_equally_spaced(self, ii, number_of_projections_per_subtomo, maxangle=360, reverse=True, verbose=False): + """Return angles for equally spaced tomography with sub-tomogram starting angles + shifted according to the golden ratio. Operates over maxangle degrees (360 for LamNI). + ii is the projection number starting at 0.""" + angular_step = maxangle / number_of_projections_per_subtomo + subtomo_number = int(((ii) * angular_step) / maxangle) + 1 + start_angle = self._golden(subtomo_number - 1, 1, angular_step)[0] + projection_number_of_subtomo = ( + ii - (subtomo_number - 1) * number_of_projections_per_subtomo + ) + + if reverse: + if subtomo_number % 2: + angle = start_angle + projection_number_of_subtomo * angular_step + else: + angle = ( + start_angle + + (number_of_projections_per_subtomo - 1) * angular_step + - projection_number_of_subtomo * angular_step + ) + else: + angle = start_angle + projection_number_of_subtomo * angular_step + + if verbose: + print( + f"Equally spaced golden ratio tomography.\n" + f"Angular step: {angular_step}\n" + f"Subtomo Number: {subtomo_number}\n" + f"Angle: {angle}" + ) + + return angle, subtomo_number + + def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None): + """Start a tomo scan. + + Args: + subtomo_start (int): For tomo_type 1, the sub-tomogram number to start from. Defaults to 1. + start_angle (float, optional): Override the starting angle of the first sub-tomogram. Defaults to None. + projection_number (int, optional): For tomo_types 2 and 3, resume from this projection index. Defaults to None. + """ bec = builtins.__dict__.get("bec") scans = builtins.__dict__.get("scans") self._current_special_angles = self.special_angles.copy() - if self.tomo_type == 1 and subtomo_start == 1 and start_angle is None: + # Register a new tomo scan in the database and write the PDF report + # only when starting fresh (not resuming mid-scan) + if ( + (self.tomo_type == 1 and subtomo_start == 1 and start_angle is None) + or (self.tomo_type == 2 and projection_number is None) + or (self.tomo_type == 3 and projection_number is None) + ): # pylint: disable=undefined-variable self.tomo_id = self.add_sample_database( self.sample_name, @@ -1128,10 +1242,108 @@ class LamNI(LamNIOpticsMixin): "BEC", ) self.write_pdf_report() + with scans.dataset_id_on_hold: - for ii in range(subtomo_start, 9): - self.sub_tomo_scan(ii, start_angle=start_angle) - start_angle = None + if self.tomo_type == 1: + # 8 equally spaced sub-tomograms over 360 degrees + self.progress["tomo_type"] = "Equally spaced sub-tomograms" + for ii in range(subtomo_start, 9): + self.sub_tomo_scan(ii, start_angle=start_angle) + start_angle = None + + elif self.tomo_type == 2: + # Golden ratio tomography (sorted bunches) over 360 degrees + self.progress["tomo_type"] = "Golden ratio tomography" + previous_subtomo_number = -1 + ii = 0 if projection_number is None else projection_number + while True: + angle, subtomo_number = self._golden( + ii, self.golden_ratio_bunch_size, maxangle=360, reverse=True + ) + if previous_subtomo_number != subtomo_number: + self._write_subtomo_to_scilog(subtomo_number) + if ( + subtomo_number % 2 == 1 + and ii > 10 + and self.golden_projections_at_0_deg_for_damage_estimation == 1 + ): + self._tomo_scan_at_angle(0, subtomo_number) + previous_subtomo_number = subtomo_number + + self.progress["subtomo"] = subtomo_number + self.progress["projection"] = ii + self.progress["angle"] = angle + if self.golden_ratio_bunch_size > 0: + self.progress["subtomo_total_projections"] = self.golden_ratio_bunch_size + self.progress["subtomo_projection"] = ( + ii - (subtomo_number - 1) * self.golden_ratio_bunch_size + ) + else: + self.progress["subtomo_total_projections"] = 0 + self.progress["subtomo_projection"] = 0 + self.progress["total_projections"] = ( + self.golden_max_number_of_projections + if self.golden_max_number_of_projections > 0 + else 0 + ) + + self._tomo_scan_at_angle(angle, subtomo_number) + ii += 1 + if ( + self.golden_max_number_of_projections > 0 + and ii > self.golden_max_number_of_projections + ): + print( + f"Golden ratio tomography stopped automatically after the requested" + f" {self.golden_max_number_of_projections} projections." + ) + break + + elif self.tomo_type == 3: + # Equally spaced tomography with golden ratio starting angles over 360 degrees + self.progress["tomo_type"] = "Equally spaced, golden ratio starting angles" + previous_subtomo_number = -1 + ii = 0 if projection_number is None else projection_number + while True: + angle, subtomo_number = self._golden_equally_spaced( + ii, int(360 / self.tomo_angle_stepsize), maxangle=360, reverse=True + ) + if previous_subtomo_number != subtomo_number: + self._write_subtomo_to_scilog(subtomo_number) + if ( + subtomo_number % 2 == 1 + and ii > 10 + and self.golden_projections_at_0_deg_for_damage_estimation == 1 + ): + self._tomo_scan_at_angle(0, subtomo_number) + previous_subtomo_number = subtomo_number + + self.progress["subtomo"] = subtomo_number + self.progress["projection"] = ii + self.progress["angle"] = angle + self.progress["subtomo_total_projections"] = 360 / self.tomo_angle_stepsize + self.progress["subtomo_projection"] = ( + ii - (subtomo_number - 1) * self.progress["subtomo_total_projections"] + ) + self.progress["total_projections"] = ( + self.golden_max_number_of_projections + if self.golden_max_number_of_projections > 0 + else 0 + ) + + self._tomo_scan_at_angle(angle, subtomo_number) + ii += 1 + if ( + self.golden_max_number_of_projections > 0 + and ii > self.golden_max_number_of_projections + ): + print( + f"Golden ratio tomography stopped automatically after the requested" + f" {self.golden_max_number_of_projections} projections." + ) + break + else: + raise ValueError(f"Unknown tomo_type: {self.tomo_type}.") def tomo_parameters(self): """print and update the tomo parameters""" @@ -1154,9 +1366,37 @@ class LamNI(LamNIOpticsMixin): print(f" _tomo_fovy_offset = {self.align.tomo_fovy_offset}") print(f" _manual_shift_x = {self.manual_shift_x}") print(f" _manual_shift_y = {self.manual_shift_y}") - print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees") - print(f"Resulting in number of projections: {360/self.tomo_angle_stepsize*8}") - print(f"Sample name: {self.sample_name}\n") + print("") + if self.tomo_type == 1: + print("\x1b[1mTomo type 1:\x1b[0m 8 equally spaced sub-tomograms") + print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees") + print(f"Resulting in number of projections: {360/self.tomo_angle_stepsize*8}") + elif self.tomo_type == 2: + print("\x1b[1mTomo type 2:\x1b[0m Golden ratio tomography") + print(f"Sorted in bunches of: {self.golden_ratio_bunch_size}") + if self.golden_max_number_of_projections > 0: + print(f"Ending after {self.golden_max_number_of_projections} projections.") + else: + print("Ending by manual interruption.") + if self.golden_projections_at_0_deg_for_damage_estimation == 1: + print( + "Repeating projections at 0 degrees at the beginning of every second subtomogram." + ) + elif self.tomo_type == 3: + print( + "\x1b[1mTomo type 3:\x1b[0m Equally spaced tomography, golden ratio starting angle" + ) + print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees") + print(f"Number of projections per sub-tomogram: {360/self.tomo_angle_stepsize}") + if self.golden_max_number_of_projections > 0: + print(f"Ending after {self.golden_max_number_of_projections} projections.") + else: + print("Ending by manual interruption.") + if self.golden_projections_at_0_deg_for_damage_estimation == 1: + print( + "Repeating projections at 0 degrees at the beginning of every second subtomogram." + ) + print(f"\nSample name: {self.sample_name}\n") user_input = input("Are these parameters correctly set for your scan? ") if user_input == "y": @@ -1176,13 +1416,61 @@ class LamNI(LamNIOpticsMixin): self.ptycho_reconstruct_foldername = self._get_val( "Reconstruction queue ", self.ptycho_reconstruct_foldername, str ) - tomo_numberofprojections = self._get_val( - "Number of projections", 360 / self.tomo_angle_stepsize * 8, int - ) - print(f"The angular step will be {360/tomo_numberofprojections}") - self.tomo_angle_stepsize = 360 / tomo_numberofprojections * 8 - print(f"The angular step in a subtomogram it will be {self.tomo_angle_stepsize}") + print("Tomography type:") + print(" 1: 8 equally spaced sub-tomograms (360 deg)") + print(" 2: Golden ratio tomography") + print(" 3: Equally spaced tomography, golden ratio starting angle") + self.tomo_type = self._get_val("Tomography type", self.tomo_type, int) + + if self.tomo_type == 1: + tomo_numberofprojections = self._get_val( + "Number of projections", 360 / self.tomo_angle_stepsize * 8, int + ) + print(f"The angular step will be {360/tomo_numberofprojections}") + self.tomo_angle_stepsize = 360 / tomo_numberofprojections * 8 + print(f"The angular step in a subtomogram it will be {self.tomo_angle_stepsize}") + + elif self.tomo_type == 2: + while True: + bunch_size = self._get_val( + "Number of projections sorted per bunch (minimum 20)", + self.golden_ratio_bunch_size, + int, + ) + if bunch_size >= 20: + self.golden_ratio_bunch_size = bunch_size + break + print("Bunch size must be at least 20. Please try again.") + self.golden_max_number_of_projections = self._get_val( + "Stop after number of projections (0 for endless)", + self.golden_max_number_of_projections, + int, + ) + self.golden_projections_at_0_deg_for_damage_estimation = self._get_val( + "Repeat projections at 0 deg every second subtomo 1/0?", + self.golden_projections_at_0_deg_for_damage_estimation, + int, + ) + + elif self.tomo_type == 3: + numprj = self._get_val( + "Number of projections per sub-tomogram", + int(360 / self.tomo_angle_stepsize), + int, + ) + self.tomo_angle_stepsize = 360 / numprj + self.golden_max_number_of_projections = self._get_val( + "Stop after number of projections (0 for endless)", + self.golden_max_number_of_projections, + int, + ) + self.golden_projections_at_0_deg_for_damage_estimation = self._get_val( + "Repeat projections at 0 deg every second subtomo 1/0?", + self.golden_projections_at_0_deg_for_damage_estimation, + int, + ) + self.sample_name = self._get_val("sample name", self.sample_name, str) @staticmethod @@ -1248,6 +1536,7 @@ class LamNI(LamNIOpticsMixin): ( f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n" ), + f"{'Tomo type:':<{padding}}{self.tomo_type:>{padding}}\n", ] content = "".join(content) user_target = os.path.expanduser(f"~/Data10/documentation/tomo_scan_ID_{self.tomo_id}.pdf") @@ -1440,3 +1729,4 @@ class DataDrivenLamNI(LamNI): shapes.append(data.shape) if len(set(shapes)) > 1: raise ValueError(f"Tomo data file has entries of inconsistent lengths: {shapes}.") + \ No newline at end of file -- 2.49.1