From 6f60bd4b2ba9eb05f398c419e42be3968db54f4a Mon Sep 17 00:00:00 2001 From: x01dc Date: Fri, 20 Feb 2026 14:39:22 +0100 Subject: [PATCH] first commit, getting lamni running and adding some missing features in controller rt and galil --- .../plugins/LamNI/lamni_optics_mixin.py | 32 ++++--- .../plugins/LamNI/x_ray_eye_align.py | 4 +- .../plugins/flomni/flomni.py | 52 +++++------ csaxs_bec/devices/omny/galil/lgalil_ophyd.py | 75 ++++++++++++++- csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py | 92 ++++++++++++++++++- 5 files changed, 210 insertions(+), 45 deletions(-) diff --git a/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni_optics_mixin.py b/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni_optics_mixin.py index 29d17eb..f70385c 100644 --- a/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni_optics_mixin.py +++ b/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni_optics_mixin.py @@ -6,6 +6,8 @@ from rich.console import Console from rich.table import Table from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_put, fshclose +from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools + # import builtins to avoid linter errors dev = builtins.__dict__.get("dev") @@ -18,25 +20,32 @@ class LamNIInitError(Exception): class LaMNIInitStagesMixin: + def __init__(self, client): + super().__init__() + self.client = client + self.OMNYTools = OMNYTools(self.client) + def lamni_init_stages(self): - user_input = input("Starting initialization of LamNI stages. OK? [y/n]") - if user_input == "y": + + if self.OMNYTools.yesno("Start initialization of LamNI stages. OK?"): print("staring...") dev.lsamrot.enabled = True else: return + + + if self.check_all_axes_of_lamni_referenced(): - user_input = input("Continue anyways? [y/n]") - if user_input == "y": + if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"): print("ok then...") else: return axis_id_lsamrot = dev.lsamrot._config["deviceConfig"].get("axis_Id") if dev.lsamrot.controller.get_motor_limit_switch(axis_id_lsamrot)[1] == False: - user_input = input("The rotation stage will be moved to one limit [y/n]") - if user_input == "y": + + if self.OMNYTools.yesno("The rotation stage will be moved to one limit"): print("starting...") else: return @@ -47,10 +56,7 @@ class LaMNIInitStagesMixin: print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True") return - user_input = input( - "Init of loptz. Can the stage move to the upstream limit without collision?? [y/n]" - ) - if user_input == "y": + if self.OMNYTools.yesno("Init of loptz. Can the stage move to the upstream limit without collision?"): print("ok then...") else: return @@ -81,8 +87,7 @@ class LaMNIInitStagesMixin: time.sleep(0.1) self.find_reference_mark(dev.lsamrot) - user_input = input("Init of leye. Can the stage move to -x limit without collision? [y/n]") - if user_input == "y": + if self.OMNYTools.yesno("Init of leye. Can the stage move to -x limit without collision?"): print("starting...") else: return @@ -134,8 +139,7 @@ class LaMNIInitStagesMixin: return ord(axis_id.lower()) - 97 def _align_setup(self): - user_input = input("Start moving stages to default initial positions? [y/n]") - if user_input == "y": + if self.OMNYTools.yesno("Start moving stages to default initial positions?"): print("Start moving stages...") else: print("Stopping.") diff --git a/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py b/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py index 2b39022..b446e4c 100644 --- a/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py +++ b/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py @@ -15,6 +15,7 @@ from bec_lib.pdf_writer import PDFWriter from typeguard import typechecked from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen +from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools from .lamni_optics_mixin import LaMNIInitStagesMixin, LamNIOpticsMixin @@ -511,7 +512,7 @@ class LamNI(LamNIOpticsMixin): super().__init__() self.client = client self.align = XrayEyeAlign(client, self) - self.init = LaMNIInitStagesMixin() + self.init = LaMNIInitStagesMixin(client) self.check_shutter = True self.check_light_available = True self.check_fofb = True @@ -524,6 +525,7 @@ class LamNI(LamNIOpticsMixin): self._beam_is_okay = True self._stop_beam_check_event = None self.beam_check_thread = None + self.OMNYTools = OMNYTools(self.client) def get_beamline_checks_enabled(self): print( diff --git a/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py b/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py index 87f3680..806ee04 100644 --- a/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py +++ b/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py @@ -35,32 +35,32 @@ class FlomniInitError(Exception): class FlomniError(Exception): pass -class FlomniTools: - def yesno(self, message: str, default="none", autoconfirm=0) -> bool: - if autoconfirm and default == "y": - self.printgreen(message + " Automatically confirming default: yes") - return True - elif autoconfirm and default == "n": - self.printgreen(message + " Automatically confirming default: no") - return False - if default == "y": - message_ending = " [Y]/n? " - elif default == "n": - message_ending = " y/[N]? " - else: - message_ending = " y/n? " - while True: - user_input = input(self.OKBLUE + message + message_ending + self.ENDC) - if ( - user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes" - ) or (default == "y" and user_input == ""): - return True - if ( - user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No" - ) or (default == "n" and user_input == ""): - return False - else: - print("Please expicitely confirm y or n.") +# class FlomniTools: +# def yesno(self, message: str, default="none", autoconfirm=0) -> bool: +# if autoconfirm and default == "y": +# self.printgreen(message + " Automatically confirming default: yes") +# return True +# elif autoconfirm and default == "n": +# self.printgreen(message + " Automatically confirming default: no") +# return False +# if default == "y": +# message_ending = " [Y]/n? " +# elif default == "n": +# message_ending = " y/[N]? " +# else: +# message_ending = " y/n? " +# while True: +# user_input = input(self.OKBLUE + message + message_ending + self.ENDC) +# if ( +# user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes" +# ) or (default == "y" and user_input == ""): +# return True +# if ( +# user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No" +# ) or (default == "n" and user_input == ""): +# return False +# else: +# print("Please expicitely confirm y or n.") diff --git a/csaxs_bec/devices/omny/galil/lgalil_ophyd.py b/csaxs_bec/devices/omny/galil/lgalil_ophyd.py index 87d3c19..31757fa 100644 --- a/csaxs_bec/devices/omny/galil/lgalil_ophyd.py +++ b/csaxs_bec/devices/omny/galil/lgalil_ophyd.py @@ -25,6 +25,34 @@ logger = bec_logger.logger class LamniGalilController(GalilController): + + + # ============================================================ + # Error status + # ============================================================ + + caperr_bits = { + 0x01: "Cap1 outside expected left-stop range (early check)", + 0x02: "Cap2 outside expected left-stop range (early check)", + 0x04: "Cap1 too low during pressure-off check (near right boundary)", + 0x08: "Cap2 too low during pressure-off check (near right boundary)", + 0x10: "Cap1 exceeded allowed left-stop boundary during movement", + 0x20: "Cap2 exceeded allowed left-stop boundary during movement (disabled in code)", + 0x40: "Cap1 did not respond to test movement", + 0x80: "Cap2 did not respond to test movement" + } + + allaxrer_table = { + 1: "Not all axes referenced after reference search", + 2: "Pressure-loss emergency stop (pressure 14/15 active while motor C off)", + 3: "Unexpected pressure OFF while soft-limits not yet set", + 4: "Pressure valve mismatch (OUT13=0 but IN13=1)", + 5: "Capacitive sensor boundary violations (caperr > 0)", + 6: "Emergency Stop triggered (IN[5]=0)", + 7: "Following error detected on one or more axes" + } + + USER_ACCESS = [ "describe", "show_running_threads", @@ -37,6 +65,8 @@ class LamniGalilController(GalilController): "get_motor_limit_switch", "is_motor_on", "all_axes_referenced", + "lamni_lights_off", + "lamni_lights_on" ] def show_status_other(self): @@ -60,6 +90,47 @@ class LamniGalilController(GalilController): print("There is air pressure at the outer rotation radial.") swver = float(self.socket_put_and_receive("MGswver")) print(f"Lgalil LAMNI firmware version {swver:2.0f}.") + allaxref = int(float(self.socket_put_and_receive("MGallaxref"))) + print(f"Error statuts:") + if allaxref == 1: + print(f"Allaxref = 1, all OK.") + else: + print(f"Allaxref = {allaxref}. Not all axes are referenced or error introduced preventing motion.") + allaxrer = int(float(self.socket_put_and_receive("MGallaxrer"))) + print("\nallaxrer =", allaxrer) + print(self.decode_allaxrer(allaxrer)) + caperr = int(float(self.socket_put_and_receive("MGcaperr"))) + print("\nDecoding caperr =", caperr) + self.visualize_caperr(caperr) + + def decode_allaxrer(self, code: int) -> str: + """Return human-readable meaning of allaxrer code.""" + return self.allaxrer_table.get(code, "Unknown allaxrer code") + + def visualize_caperr(self, mask: int): + """Pretty-print a bitmask visualization for caperr.""" + print("\n=== CAPERR BITMASK VISUALIZER ===") + print(f"Raw value: {mask} (0x{mask:02X})") + print("----------------------------------\n") + + print("Bit | Hex | Active | Meaning") + print("----------------------------------") + + for bit, meaning in self.caperr_bits.items(): + active = "YES" if mask & bit else "no" + print(f"{bit:3d} | 0x{bit:02X} | {active:6} | {meaning}") + + print("\nActive flags:") + active_flags = [meaning for bit, meaning in self.caperr_bits.items() if mask & bit] + + if active_flags: + for f in active_flags: + print(" ✓", f) + else: + print(" (none)") + + print("\n==================================\n") + def lamni_lights_off(self): self.socket_put_confirmed("SB1") @@ -93,7 +164,7 @@ class LamniGalilReadbackSignal(GalilSignalRO): val = super().read() if self.parent.axis_Id_numeric == 2: try: - rt = self.parent.device_manager.devices[self.parent.rtx] + rt = self.parent.device_manager.devices[self.parent.rt] if rt.enabled: rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"]) except KeyError: @@ -147,7 +218,7 @@ class LamniGalilMotor(Device, PositionerBase): raise BECConfigError( "device_mapping has been specified but the device_manager cannot be accessed." ) - self.rt = self.device_mapping.get("rt") + self.rt = self.device_mapping.get("rt", "rtx") super().__init__( prefix, diff --git a/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py b/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py index 8e65acc..b529ba9 100644 --- a/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py +++ b/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py @@ -11,6 +11,7 @@ from ophyd.status import wait as status_wait from ophyd.utils import LimitError, ReadOnlyError from ophyd_devices.utils.controller import Controller, threadlocked from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected +from prettytable import PrettyTable from csaxs_bec.devices.omny.rt.rt_ophyd import RtCommunicationError, RtError @@ -51,6 +52,7 @@ class RtLamniController(Controller): _axes_per_controller = 3 USER_ACCESS = [ "socket_put_and_receive", + "socket_put", "set_rotation_angle", "feedback_disable", "feedback_enable_without_reset", @@ -62,6 +64,11 @@ class RtLamniController(Controller): "_set_axis_velocity_maximum_speed", "_position_sampling_single_read", "_position_sampling_single_reset_and_start_sampling", + "show_signal_strength_interferometer", + "show_interferometer_positions", + "show_analog_signals", + "show_feedback_status", + ] def __init__( @@ -208,8 +215,9 @@ class RtLamniController(Controller): @threadlocked def start_scan(self): - interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0]) - if interferometer_feedback_not_running == 1: + # interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0]) + # if interferometer_feedback_not_running == 1: + if not self.feedback_is_running(): logger.error( "Cannot start scan because feedback loop is not running or there is an interferometer error." ) @@ -270,6 +278,44 @@ class RtLamniController(Controller): "average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)}, } return signals + + def feedback_is_running(self) -> bool: + status = int(float((self.socket_put_and_receive("J2")).split(",")[0])) + return status == 0 # 0 means running, 1 means error/disabled + + def show_feedback_status(self): + if self.feedback_is_running(): + print("Loop is running, no error on interferometer.") + else: + print("Loop is not running, either it is turned off or an interferometer error occurred.") + + + def show_analog_signals(self) -> dict: + self.socket_put("As") # start sampling + time.sleep(0.01) + return_table = (self.socket_put_and_receive("Ar")).split(",") + + number_of_samples = int(float(return_table[0])) + signals = { + "number_of_samples": number_of_samples, + "piezo_0": float(return_table[1]), + "piezo_1": float(return_table[2]), + "cap_0": float(return_table[3]), + "cap_1": float(return_table[4]), + "cap_2": float(return_table[5]), + "cap_3": float(return_table[6]), + "cap_4": float(return_table[7]), + } + + t = PrettyTable() + t.title = f"LamNI Analog Signals ({number_of_samples} samples)" + t.field_names = ["Signal", "Value"] + for key, val in signals.items(): + if key != "number_of_samples": + t.add_row([key, f"{val:.4f}"]) + print(t) + + return signals def read_positions_from_sampler(self): # this was for reading after the scan completed @@ -347,6 +393,48 @@ class RtLamniController(Controller): ) return bool(return_table[0]) + def show_signal_strength_interferometer(self): + # trigger SSI averaging before reading + self.socket_put("J3") + time.sleep(0.05) + return_table = (self.socket_put_and_receive("J2")).split(",") + ssi_0 = float(return_table[1]) + ssi_1 = float(return_table[2]) + + return_table_angle = (self.socket_put_and_receive("J7")).split(",") + angle_running = bool(int(float(return_table_angle[0]))) + angle_position = float(return_table_angle[1]) + angle_signal = float(return_table_angle[2]) + + t = PrettyTable() + t.title = "Interferometer signal strength" + t.field_names = ["Axis", "Description", "Value", "Running"] + t.add_row([0, "ST FZP horizontal", ssi_0, "-"]) + t.add_row([1, "ST FZP vertical", ssi_1, "-"]) + t.add_row([2, "Angle interferometer", angle_signal, angle_running]) + print(t) + + if angle_running: + print(f"Angle interferometer position: {angle_position:.4f} um") + else: + print("Warning: angle interferometer is not running.") + + def show_interferometer_positions(self) -> dict: + return_table = (self.socket_put_and_receive("J4")).split(",") + loop_status = bool(int(float(return_table[0]))) + pos_y = float(return_table[1]) + pos_x = float(return_table[2]) + + t = PrettyTable() + t.title = "LamNI Interferometer Positions" + t.field_names = ["Axis", "Description", "Position (um)"] + t.add_row([0, "X", f"{pos_x:.4f}"]) + t.add_row([1, "Y", f"{pos_y:.4f}"]) + print(t) + print(f"Feedback loop running: {loop_status}") + + return {"x": pos_x, "y": pos_y, "loop_running": loop_status} + def feedback_enable_with_reset(self): if not self.feedback_status_angle_lamni(): self.feedback_disable_and_even_reset_lamni_angle_interferometer()