From 793eedb3f42197b41938f6db79dde29ebc1f08ce Mon Sep 17 00:00:00 2001 From: Holler Mirko Date: Wed, 11 Sep 2024 12:12:32 +0200 Subject: [PATCH] continued with mirror tweaking. started with laser tracker. implemented tweak curser function. --- .../plugins/omny/omny_general_tools.py | 90 +++++++++- .../plugins/omny/omny_rt.py | 169 +++++++++++------- csaxs_bec/devices/omny/rt/rt_omny_ophyd.py | 61 +++++-- 3 files changed, 240 insertions(+), 80 deletions(-) diff --git a/csaxs_bec/bec_ipython_client/plugins/omny/omny_general_tools.py b/csaxs_bec/bec_ipython_client/plugins/omny/omny_general_tools.py index d9a1ce6..d4c308d 100644 --- a/csaxs_bec/bec_ipython_client/plugins/omny/omny_general_tools.py +++ b/csaxs_bec/bec_ipython_client/plugins/omny/omny_general_tools.py @@ -1,11 +1,23 @@ import time import numpy as np +import sys +import termios +import tty +import fcntl +import os +import builtins from rich import box from rich.console import Console from rich.table import Table -from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose +#from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose + +if builtins.__dict__.get("bec") is not None: + bec = builtins.__dict__.get("bec") + dev = builtins.__dict__.get("dev") + umv = builtins.__dict__.get("umv") + umvr = builtins.__dict__.get("umvr") @@ -61,3 +73,79 @@ class OMNYTools: return False else: print("Please expicitely confirm y or n.") + + + def tweak_cursor(self,dev1, step1:float, dev2="none", step2:float="0", special_command = "none"): + if dev1 not in dev.enabled_devices: + print(f"Device 1 {dev} is not in enabled devices.") + return + if dev2 not in dev.enabled_devices and dev2 != "none": + print(f"Device 2 {dev} is not in enabled devices.") + return + # Save the current terminal settings + fd = sys.stdin.fileno() + old_term = termios.tcgetattr(fd) + try: + # Set the terminal to raw mode to capture single key presses + tty.setraw(fd) + # Set stdin to non-blocking mode + old_flags = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, old_flags | os.O_NONBLOCK) + print("Tweak Cursor."+self.BOLD+self.OKBLUE+"Press (q) to quit!\r"+self.ENDC) + while True: + try: + # Read single character input + key = sys.stdin.read(1) + if key == 'q': + print("\n\rExiting tweak mode\r") + break + elif key == '\x1b': # Escape sequences for arrow keys + next1, next2 = sys.stdin.read(2) + if next1 == '[': + if next2 == 'A': + #print("up") + if(dev2 != "none"): + umvr(dev2,step2) + if(special_command != "none"): + special_command() + elif next2 == 'B': + #print(" down") + if(dev2 != "none"): + umvr(dev2,-step2) + if(special_command != "none"): + special_command() + elif next2 == 'C': + #print("right") + umvr(dev1,step1) + if(special_command != "none"): + special_command() + elif next2 == 'D': + #print("left") + umvr(dev1,-step1) + if(special_command != "none"): + special_command() + + elif key == '+': + step1 = step1*2 + if(dev2 != "none"): + step2 = step2*2 + print(f"\rDouble step size. New step size: {step1}, {step2}\r") + elif key == '-': + step1 = step1/2 + if(dev2 != "none"): + step2 = step2/2 + print(f"\rHalf step size. New step size: {step1}, {step2}\r") + except IOError: + # No input available, keep looping + pass + + # Sleep for a short period to avoid high CPU usage + time.sleep(0.02) + + finally: + # Restore the terminal to its original state + termios.tcsetattr(fd, termios.TCSADRAIN, old_term) + fcntl.fcntl(fd, fcntl.F_SETFL, old_flags) + + + diff --git a/csaxs_bec/bec_ipython_client/plugins/omny/omny_rt.py b/csaxs_bec/bec_ipython_client/plugins/omny/omny_rt.py index ed58cb0..79f7bca 100644 --- a/csaxs_bec/bec_ipython_client/plugins/omny/omny_rt.py +++ b/csaxs_bec/bec_ipython_client/plugins/omny/omny_rt.py @@ -1,7 +1,11 @@ import time import numpy as np +import sys +import termios +import tty +import fcntl +import os -import curses from rich import box from rich.console import Console @@ -234,89 +238,113 @@ class OMNY_rt_client: print(f"Finished aligning axis") # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}") def omny_tweak_interferometer(self): - curses.wrapper(self._tweak_interferometer) + self._tweak_interferometer() - def _tweak_interferometer(self, stdscr): + def _tweak_interferometer(self): # global _omny_interferometer_opt_signalchannel # global _omny_interferometer_opt_mirrorname self.mirror_channel=-1 - # Set up the curses environment - #curses.curs_set(0) # Hide the cursor - stdscr.nodelay(True) # Do not wait for user input - stdscr.timeout(100) # Refresh every 100ms + + # Save the current terminal settings + fd = sys.stdin.fileno() + old_term = termios.tcgetattr(fd) print("Ready to tweak the interferometer. Press q to quit.") print("The arrows adjust directions.") print("Numbers select the mirror aligner.") - while True: - key = stdscr.getch() - - - if key == ord('q'): - self.mirror_amplitutde_increase=0 - self.mirror_channel=-1 - break - - if key == curses.KEY_LEFT: - if self.mirror_channel != -1: - self._omny_interferometer_openloop_steps(3, self.mirror_parameters[self.mirror_channel]["opt_steps1_pos"], self.mirror_parameters[self.mirror_channel]["opt_amplitude1_pos"]+self.mirror_amplitutde_increase) - - elif key == curses.KEY_RIGHT: - if self.mirror_channel != -1: - self._omny_interferometer_openloop_steps(3, -self.mirror_parameters[self.mirror_channel]["opt_steps1_neg"], self.mirror_parameters[self.mirror_channel]["opt_amplitude1_neg"]+self.mirror_amplitutde_increase) - - elif key == curses.KEY_DOWN: - if self.mirror_channel != -1: - self._omny_interferometer_openloop_steps(4, self.mirror_parameters[self.mirror_channel]["opt_steps2_pos"], self.mirror_parameters[self.mirror_channel]["opt_amplitude2_pos"]+self.mirror_amplitutde_increase) - - elif key == curses.KEY_UP: - if self.mirror_channel != -1: - self._omny_interferometer_openloop_steps(4, -self.mirror_parameters[self.mirror_channel]["opt_steps2_neg"], self.mirror_parameters[self.mirror_channel]["opt_amplitude2_neg"]+self.mirror_amplitutde_increase) + try: + # Set the terminal to raw mode to capture single key presses + tty.setraw(fd) + # Set stdin to non-blocking mode + old_flags = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, old_flags | os.O_NONBLOCK) + opt_mirrorname = "none" + max = 0 + while True: + try: + # Read single character input + key = sys.stdin.read(1) - elif key == ord('1'): - self.mirror_channel = 1 - elif key == ord('2'): - self.mirror_channel = 2 - elif key == ord('3'): - self.mirror_channel = 3 - elif key == ord('4'): - self.mirror_channel = 4 - elif key == ord('5'): - self.mirror_channel = 5 - elif key == ord('6'): - self.mirror_channel = 6 - elif key == ord('7'): - self.mirror_channel = 7 - elif key == ord('8'): - self.mirror_channel = 8 - elif key == ord('+'): - print("Increasing voltage amplitudes by 100.") - self.mirror_amplitutde_increase+=100 - elif key == ord('-'): - print("Decreasing voltage amplitudes by 100.") - self.mirror_amplitutde_increase-=100 - elif key == ord('a'): - if self.mirror_channel != -1: - self._omny_interferometer_optimize(self.mirror_channel, 3) - self._omny_interferometer_optimize(self.mirror_channel, 4) - self._omny_interferometer_optimize(self.mirror_channel, 3) - self._omny_interferometer_optimize(self.mirror_channel, 4) + if key == 'q': + self.mirror_amplitutde_increase=0 + self.mirror_channel=-1 + print("\n\rExiting tweak mode\r") + break + elif key == '\x1b': # Escape sequences for arrow keys + next1, next2 = sys.stdin.read(2) - if key >= ord('1') and key <= ord('8'): - dev.rtx.controller._omny_interferometer_switch_channel(self.mirror_channel) - - signal = dev.rtx.controller._omny_interferometer_get_signalsample(self.mirror_parameters[self.mirror_channel]["opt_signalchannel"], self.mirror_parameters[self.mirror_channel]["opt_averaging_time"]) + if next1 == '[': + printit=True + if next2 == 'A': + #print("up") + if self.mirror_channel != -1: + self._omny_interferometer_openloop_steps(4, -self.mirror_parameters[self.mirror_channel]["opt_steps2_neg"], self.mirror_parameters[self.mirror_channel]["opt_amplitude2_neg"]+self.mirror_amplitutde_increase) + elif next2 == 'B': + #print(" down") + if self.mirror_channel != -1: + self._omny_interferometer_openloop_steps(4, self.mirror_parameters[self.mirror_channel]["opt_steps2_pos"], self.mirror_parameters[self.mirror_channel]["opt_amplitude2_pos"]+self.mirror_amplitutde_increase) + elif next2 == 'C': + #print("right") + if self.mirror_channel != -1: + self._omny_interferometer_openloop_steps(3, -self.mirror_parameters[self.mirror_channel]["opt_steps1_neg"], self.mirror_parameters[self.mirror_channel]["opt_amplitude1_neg"]+self.mirror_amplitutde_increase) + elif next2 == 'D': + #print("left") + if self.mirror_channel != -1: + self._omny_interferometer_openloop_steps(3, self.mirror_parameters[self.mirror_channel]["opt_steps1_pos"], self.mirror_parameters[self.mirror_channel]["opt_amplitude1_pos"]+self.mirror_amplitutde_increase) + elif key.isdigit() and 1 <= int(key) <= 8: + self.mirror_channel = int(key) + opt_mirrorname = self.mirror_parameters[self.mirror_channel]["opt_mirrorname"] + autostop = self.mirror_parameters[self.mirror_channel]['opt_signal_stop'] + averaging_time = self.mirror_parameters[self.mirror_channel]["opt_averaging_time"] + print(f"\nSelected mirror channel {self.mirror_channel}: {opt_mirrorname}. Autostop {autostop}. Signal averaging time: {averaging_time}\r") + dev.rtx.controller._omny_interferometer_switch_channel(self.mirror_channel) + max=0 + printit=True + elif key == '+': + print("\nIncreasing voltage amplitudes by 100.\r") + self.mirror_amplitutde_increase+=100 + elif key == '-': + print("\nDecreasing voltage amplitudes by 100.\r") + self.mirror_amplitutde_increase-=100 + elif key == 'a': + if self.mirror_channel != -1: + self._omny_interferometer_optimize(self.mirror_channel, 3) + self._omny_interferometer_optimize(self.mirror_channel, 4) + self._omny_interferometer_optimize(self.mirror_channel, 3) + self._omny_interferometer_optimize(self.mirror_channel, 4) + if self.mirror_channel != -1 and printit: + printit = False + signal = dev.rtx.controller._omny_interferometer_get_signalsample(self.mirror_parameters[self.mirror_channel]["opt_signalchannel"], self.mirror_parameters[self.mirror_channel]["opt_averaging_time"]) + if signal > max: + max = signal + info_str = f"Channel {self.mirror_channel}, {opt_mirrorname}, Current signal: {signal:.0f}" + filling = " " * (50-len(info_str)) + # Calculate the number of filled and unfilled segments + length = 30 + percentage = signal / max + filled_length = int(length * percentage) + unfilled_length = length - filled_length + bar = '#' * filled_length + '-' * unfilled_length + print(info_str + filling + "0 " + bar + f" {max:.0f} (q)uit\r", end='') - time.sleep(0.01) - #print(f"Channel {self.mirror_channel} - {self.mirror_parameters[self.mirror_channel]['opt_mirrorname']} - Autostop {self.mirror_parameters[self.mirror_channel]['opt_signal_stop']} - Signal {signal}") + except IOError: + # No input available, keep looping + pass - dev.rtx.controller._omny_interferometer_switch_alloff() - self.mirror_channel = -1 - self.mirror_amplitutde_increase = 0 - dev.rtx.controller.show_signal_strength_interferometer() + # Sleep for a short period to avoid high CPU usage + time.sleep(0.02) + + finally: + # Restore the terminal to its original state + termios.tcsetattr(fd, termios.TCSADRAIN, old_term) + fcntl.fcntl(fd, fcntl.F_SETFL, old_flags) + dev.rtx.controller._omny_interferometer_switch_alloff() + self.mirror_channel = -1 + self.mirror_amplitutde_increase = 0 + dev.rtx.controller.show_signal_strength_interferometer() def omny_interferometer_align_incoupling_angle(self): @@ -336,6 +364,9 @@ class OMNY_rt_client: dev.rtx.controller.show_signal_strength_interferometer() self.mirror_channel=-1 + + def omny_interferometer_tweak_otrack(self): + self.OMNYTools.tweak_cursor(dev.otracky,.1,dev.otrackz,.1,special_command=dev.rtx.controller.laser_tracker_print_intensity_for_otrack_tweaking) def omny_interferometer_align_tracking(self): diff --git a/csaxs_bec/devices/omny/rt/rt_omny_ophyd.py b/csaxs_bec/devices/omny/rt/rt_omny_ophyd.py index bd96793..7632d84 100644 --- a/csaxs_bec/devices/omny/rt/rt_omny_ophyd.py +++ b/csaxs_bec/devices/omny/rt/rt_omny_ophyd.py @@ -39,6 +39,8 @@ class RtOMNY_mirror_switchbox_Error(Exception): class RtOMNYController(Controller): _axes_per_controller = 3 + red = "\x1b[91m" + white = "\x1b[0m" USER_ACCESS = [ "socket_put", "socket_put_and_receive", @@ -63,6 +65,9 @@ class RtOMNYController(Controller): "_omny_interferometer_switch_LED_on", "_omny_interferometer_switch_alloff", "_omny_interferometer_get_signalsample", + "laser_tracker_galil_enable", + "laser_tracker_galil_disable", + "laser_tracker_print_intensity_for_otrack_tweaking", ] def __init__( @@ -336,14 +341,50 @@ class RtOMNYController(Controller): self.interferometer_align_tracking() + def laser_tracker_print_intensity_for_otrack_tweaking(self): + self.laser_update_tracker_info() + _laser_tracker_intensity = self.tracker_info["tracker_intensity"] + print(f"\r PSD beam intensity: {_laser_tracker_intensity:.2f}\r") + def laser_tracker_show_all(self): - self.feedback_get_status_and_ssi() - t = PrettyTable() - t.title = f"Laser SSI Info" - t.field_names = ["Name", "Value"] - for key, val in self.ssi.items(): - t.add_row([key, val]) - print(t) + self.laser_update_tracker_info() + # self.tracker_info = { + # "beampos_y": tracker_values[5], + # "beampos_z": tracker_values[0], + # "tracker_intensity": tracker_values[2], + # "target_y": tracker_values[6], + # "threshold_intensity_y": tracker_values[8], + # "piezo_voltage_y": tracker_values[9], + # "enabled_y": tracker_values[10], + # "target_z": tracker_values[1], + # "threshold_intensity_z": tracker_values[3], + # "piezo_voltage_z": tracker_values[4], + # "enabled_z": bool(tracker_values[10]), + # } + enabled_y = self.tracker_info["enabled_y"] + print(f"Tracker enabled: {bool(enabled_y)}") + if self.tracker_info["tracker_intensity"] < self.tracker_info["threshold_intensity_y"]: + print(self.red+" LOW INTENSITY"+self.white) + _laser_tracker_intensity = self.tracker_info["tracker_intensity"] + print(f" PSD beam intensity: {_laser_tracker_intensity:.2f}") + _laser_tracker_y_beampos = self.tracker_info["beampos_y"] + print(f" Y beam position: {_laser_tracker_y_beampos:.2f}") + _laser_tracker_y_target = self.tracker_info["target_y"] + print(f" target position: {_laser_tracker_y_target:.2f}") + _laser_tracker_y_threshold_intensity = self.tracker_info["threshold_intensity_y"] + print(f" threshold intensity: {_laser_tracker_y_threshold_intensity:.2f}") + _laser_tracker_y_piezo_voltage = self.tracker_info["piezo_voltage_y"] + print(f" Piezo voltage: {_laser_tracker_y_piezo_voltage:.2f}") + _laser_tracker_z_beampos = self.tracker_info["beampos_z"] + print(f" Z beam position: {_laser_tracker_z_beampos:.2f}") + _laser_tracker_z_target = self.tracker_info["target_z"] + print(f" target position: {_laser_tracker_z_target:.2f}") + _laser_tracker_z_threshold_intensity = self.tracker_info["threshold_intensity_z"] + print(f" threshold intensity: {_laser_tracker_z_threshold_intensity:.2f}") + _laser_tracker_z_piezo_voltage = self.tracker_info["piezo_voltage_z"] + print(f" Piezo voltage: {_laser_tracker_z_piezo_voltage:.2f}") + print(" Reminder - there is also an upper threshold active in rt\n") + self.laser_tracker_galil_status() @@ -363,13 +404,13 @@ class RtOMNYController(Controller): def laser_tracker_galil_status(self): otracky_con = self.get_device_manager().devices.otracky.obj.controller if bool(float(otracky_con.socket_put_and_receive("MGtracken").strip())) == 0: - print("Tracking in the Galil Controller is disabled.") + print(self.red+"Tracking in the Galil Controller is disabled."+self.white) print("Use laser_tracker_galil_enable to enable.\n") return(0) else: print("Tracking in the Galil Controller is enabled.") - trackyct=bool(float(otracky_con.socket_put_and_receive("MGtrackyct").strip())) - trackzct=bool(float(otracky_con.socket_put_and_receive("MGtrackzct").strip())) + trackyct=int(float(otracky_con.socket_put_and_receive("MGtrackyct").strip())) + trackzct=int(float(otracky_con.socket_put_and_receive("MGtrackzct").strip())) print(f"Galil Trackcounters y={trackyct}, z={trackzct}") def show_signal_strength_interferometer(self):