continued with mirror tweaking. started with laser tracker. implemented tweak curser function.

This commit is contained in:
Holler Mirko
2024-09-11 12:12:32 +02:00
committed by wakonig_k
parent dbae426a87
commit 793eedb3f4
3 changed files with 240 additions and 80 deletions
@@ -1,11 +1,23 @@
import time
import numpy as np
import sys
import termios
import tty
import fcntl
import os
import builtins
from rich import box
from rich.console import Console
from rich.table import Table
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
#from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
@@ -61,3 +73,79 @@ class OMNYTools:
return False
else:
print("Please expicitely confirm y or n.")
def tweak_cursor(self,dev1, step1:float, dev2="none", step2:float="0", special_command = "none"):
if dev1 not in dev.enabled_devices:
print(f"Device 1 {dev} is not in enabled devices.")
return
if dev2 not in dev.enabled_devices and dev2 != "none":
print(f"Device 2 {dev} is not in enabled devices.")
return
# Save the current terminal settings
fd = sys.stdin.fileno()
old_term = termios.tcgetattr(fd)
try:
# Set the terminal to raw mode to capture single key presses
tty.setraw(fd)
# Set stdin to non-blocking mode
old_flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, old_flags | os.O_NONBLOCK)
print("Tweak Cursor."+self.BOLD+self.OKBLUE+"Press (q) to quit!\r"+self.ENDC)
while True:
try:
# Read single character input
key = sys.stdin.read(1)
if key == 'q':
print("\n\rExiting tweak mode\r")
break
elif key == '\x1b': # Escape sequences for arrow keys
next1, next2 = sys.stdin.read(2)
if next1 == '[':
if next2 == 'A':
#print("up")
if(dev2 != "none"):
umvr(dev2,step2)
if(special_command != "none"):
special_command()
elif next2 == 'B':
#print(" down")
if(dev2 != "none"):
umvr(dev2,-step2)
if(special_command != "none"):
special_command()
elif next2 == 'C':
#print("right")
umvr(dev1,step1)
if(special_command != "none"):
special_command()
elif next2 == 'D':
#print("left")
umvr(dev1,-step1)
if(special_command != "none"):
special_command()
elif key == '+':
step1 = step1*2
if(dev2 != "none"):
step2 = step2*2
print(f"\rDouble step size. New step size: {step1}, {step2}\r")
elif key == '-':
step1 = step1/2
if(dev2 != "none"):
step2 = step2/2
print(f"\rHalf step size. New step size: {step1}, {step2}\r")
except IOError:
# No input available, keep looping
pass
# Sleep for a short period to avoid high CPU usage
time.sleep(0.02)
finally:
# Restore the terminal to its original state
termios.tcsetattr(fd, termios.TCSADRAIN, old_term)
fcntl.fcntl(fd, fcntl.F_SETFL, old_flags)
@@ -1,7 +1,11 @@
import time
import numpy as np
import sys
import termios
import tty
import fcntl
import os
import curses
from rich import box
from rich.console import Console
@@ -234,89 +238,113 @@ class OMNY_rt_client:
print(f"Finished aligning axis") # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}")
def omny_tweak_interferometer(self):
curses.wrapper(self._tweak_interferometer)
self._tweak_interferometer()
def _tweak_interferometer(self, stdscr):
def _tweak_interferometer(self):
# global _omny_interferometer_opt_signalchannel
# global _omny_interferometer_opt_mirrorname
self.mirror_channel=-1
# Set up the curses environment
#curses.curs_set(0) # Hide the cursor
stdscr.nodelay(True) # Do not wait for user input
stdscr.timeout(100) # Refresh every 100ms
# Save the current terminal settings
fd = sys.stdin.fileno()
old_term = termios.tcgetattr(fd)
print("Ready to tweak the interferometer. Press q to quit.")
print("The arrows adjust directions.")
print("Numbers select the mirror aligner.")
while True:
key = stdscr.getch()
if key == ord('q'):
self.mirror_amplitutde_increase=0
self.mirror_channel=-1
break
if key == curses.KEY_LEFT:
if self.mirror_channel != -1:
self._omny_interferometer_openloop_steps(3, self.mirror_parameters[self.mirror_channel]["opt_steps1_pos"], self.mirror_parameters[self.mirror_channel]["opt_amplitude1_pos"]+self.mirror_amplitutde_increase)
elif key == curses.KEY_RIGHT:
if self.mirror_channel != -1:
self._omny_interferometer_openloop_steps(3, -self.mirror_parameters[self.mirror_channel]["opt_steps1_neg"], self.mirror_parameters[self.mirror_channel]["opt_amplitude1_neg"]+self.mirror_amplitutde_increase)
elif key == curses.KEY_DOWN:
if self.mirror_channel != -1:
self._omny_interferometer_openloop_steps(4, self.mirror_parameters[self.mirror_channel]["opt_steps2_pos"], self.mirror_parameters[self.mirror_channel]["opt_amplitude2_pos"]+self.mirror_amplitutde_increase)
elif key == curses.KEY_UP:
if self.mirror_channel != -1:
self._omny_interferometer_openloop_steps(4, -self.mirror_parameters[self.mirror_channel]["opt_steps2_neg"], self.mirror_parameters[self.mirror_channel]["opt_amplitude2_neg"]+self.mirror_amplitutde_increase)
try:
# Set the terminal to raw mode to capture single key presses
tty.setraw(fd)
# Set stdin to non-blocking mode
old_flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, old_flags | os.O_NONBLOCK)
opt_mirrorname = "none"
max = 0
while True:
try:
# Read single character input
key = sys.stdin.read(1)
elif key == ord('1'):
self.mirror_channel = 1
elif key == ord('2'):
self.mirror_channel = 2
elif key == ord('3'):
self.mirror_channel = 3
elif key == ord('4'):
self.mirror_channel = 4
elif key == ord('5'):
self.mirror_channel = 5
elif key == ord('6'):
self.mirror_channel = 6
elif key == ord('7'):
self.mirror_channel = 7
elif key == ord('8'):
self.mirror_channel = 8
elif key == ord('+'):
print("Increasing voltage amplitudes by 100.")
self.mirror_amplitutde_increase+=100
elif key == ord('-'):
print("Decreasing voltage amplitudes by 100.")
self.mirror_amplitutde_increase-=100
elif key == ord('a'):
if self.mirror_channel != -1:
self._omny_interferometer_optimize(self.mirror_channel, 3)
self._omny_interferometer_optimize(self.mirror_channel, 4)
self._omny_interferometer_optimize(self.mirror_channel, 3)
self._omny_interferometer_optimize(self.mirror_channel, 4)
if key == 'q':
self.mirror_amplitutde_increase=0
self.mirror_channel=-1
print("\n\rExiting tweak mode\r")
break
elif key == '\x1b': # Escape sequences for arrow keys
next1, next2 = sys.stdin.read(2)
if key >= ord('1') and key <= ord('8'):
dev.rtx.controller._omny_interferometer_switch_channel(self.mirror_channel)
signal = dev.rtx.controller._omny_interferometer_get_signalsample(self.mirror_parameters[self.mirror_channel]["opt_signalchannel"], self.mirror_parameters[self.mirror_channel]["opt_averaging_time"])
if next1 == '[':
printit=True
if next2 == 'A':
#print("up")
if self.mirror_channel != -1:
self._omny_interferometer_openloop_steps(4, -self.mirror_parameters[self.mirror_channel]["opt_steps2_neg"], self.mirror_parameters[self.mirror_channel]["opt_amplitude2_neg"]+self.mirror_amplitutde_increase)
elif next2 == 'B':
#print(" down")
if self.mirror_channel != -1:
self._omny_interferometer_openloop_steps(4, self.mirror_parameters[self.mirror_channel]["opt_steps2_pos"], self.mirror_parameters[self.mirror_channel]["opt_amplitude2_pos"]+self.mirror_amplitutde_increase)
elif next2 == 'C':
#print("right")
if self.mirror_channel != -1:
self._omny_interferometer_openloop_steps(3, -self.mirror_parameters[self.mirror_channel]["opt_steps1_neg"], self.mirror_parameters[self.mirror_channel]["opt_amplitude1_neg"]+self.mirror_amplitutde_increase)
elif next2 == 'D':
#print("left")
if self.mirror_channel != -1:
self._omny_interferometer_openloop_steps(3, self.mirror_parameters[self.mirror_channel]["opt_steps1_pos"], self.mirror_parameters[self.mirror_channel]["opt_amplitude1_pos"]+self.mirror_amplitutde_increase)
elif key.isdigit() and 1 <= int(key) <= 8:
self.mirror_channel = int(key)
opt_mirrorname = self.mirror_parameters[self.mirror_channel]["opt_mirrorname"]
autostop = self.mirror_parameters[self.mirror_channel]['opt_signal_stop']
averaging_time = self.mirror_parameters[self.mirror_channel]["opt_averaging_time"]
print(f"\nSelected mirror channel {self.mirror_channel}: {opt_mirrorname}. Autostop {autostop}. Signal averaging time: {averaging_time}\r")
dev.rtx.controller._omny_interferometer_switch_channel(self.mirror_channel)
max=0
printit=True
elif key == '+':
print("\nIncreasing voltage amplitudes by 100.\r")
self.mirror_amplitutde_increase+=100
elif key == '-':
print("\nDecreasing voltage amplitudes by 100.\r")
self.mirror_amplitutde_increase-=100
elif key == 'a':
if self.mirror_channel != -1:
self._omny_interferometer_optimize(self.mirror_channel, 3)
self._omny_interferometer_optimize(self.mirror_channel, 4)
self._omny_interferometer_optimize(self.mirror_channel, 3)
self._omny_interferometer_optimize(self.mirror_channel, 4)
if self.mirror_channel != -1 and printit:
printit = False
signal = dev.rtx.controller._omny_interferometer_get_signalsample(self.mirror_parameters[self.mirror_channel]["opt_signalchannel"], self.mirror_parameters[self.mirror_channel]["opt_averaging_time"])
if signal > max:
max = signal
info_str = f"Channel {self.mirror_channel}, {opt_mirrorname}, Current signal: {signal:.0f}"
filling = " " * (50-len(info_str))
# Calculate the number of filled and unfilled segments
length = 30
percentage = signal / max
filled_length = int(length * percentage)
unfilled_length = length - filled_length
bar = '#' * filled_length + '-' * unfilled_length
print(info_str + filling + "0 " + bar + f" {max:.0f} (q)uit\r", end='')
time.sleep(0.01)
#print(f"Channel {self.mirror_channel} - {self.mirror_parameters[self.mirror_channel]['opt_mirrorname']} - Autostop {self.mirror_parameters[self.mirror_channel]['opt_signal_stop']} - Signal {signal}")
except IOError:
# No input available, keep looping
pass
dev.rtx.controller._omny_interferometer_switch_alloff()
self.mirror_channel = -1
self.mirror_amplitutde_increase = 0
dev.rtx.controller.show_signal_strength_interferometer()
# Sleep for a short period to avoid high CPU usage
time.sleep(0.02)
finally:
# Restore the terminal to its original state
termios.tcsetattr(fd, termios.TCSADRAIN, old_term)
fcntl.fcntl(fd, fcntl.F_SETFL, old_flags)
dev.rtx.controller._omny_interferometer_switch_alloff()
self.mirror_channel = -1
self.mirror_amplitutde_increase = 0
dev.rtx.controller.show_signal_strength_interferometer()
def omny_interferometer_align_incoupling_angle(self):
@@ -336,6 +364,9 @@ class OMNY_rt_client:
dev.rtx.controller.show_signal_strength_interferometer()
self.mirror_channel=-1
def omny_interferometer_tweak_otrack(self):
self.OMNYTools.tweak_cursor(dev.otracky,.1,dev.otrackz,.1,special_command=dev.rtx.controller.laser_tracker_print_intensity_for_otrack_tweaking)
def omny_interferometer_align_tracking(self):
+51 -10
View File
@@ -39,6 +39,8 @@ class RtOMNY_mirror_switchbox_Error(Exception):
class RtOMNYController(Controller):
_axes_per_controller = 3
red = "\x1b[91m"
white = "\x1b[0m"
USER_ACCESS = [
"socket_put",
"socket_put_and_receive",
@@ -63,6 +65,9 @@ class RtOMNYController(Controller):
"_omny_interferometer_switch_LED_on",
"_omny_interferometer_switch_alloff",
"_omny_interferometer_get_signalsample",
"laser_tracker_galil_enable",
"laser_tracker_galil_disable",
"laser_tracker_print_intensity_for_otrack_tweaking",
]
def __init__(
@@ -336,14 +341,50 @@ class RtOMNYController(Controller):
self.interferometer_align_tracking()
def laser_tracker_print_intensity_for_otrack_tweaking(self):
self.laser_update_tracker_info()
_laser_tracker_intensity = self.tracker_info["tracker_intensity"]
print(f"\r PSD beam intensity: {_laser_tracker_intensity:.2f}\r")
def laser_tracker_show_all(self):
self.feedback_get_status_and_ssi()
t = PrettyTable()
t.title = f"Laser SSI Info"
t.field_names = ["Name", "Value"]
for key, val in self.ssi.items():
t.add_row([key, val])
print(t)
self.laser_update_tracker_info()
# self.tracker_info = {
# "beampos_y": tracker_values[5],
# "beampos_z": tracker_values[0],
# "tracker_intensity": tracker_values[2],
# "target_y": tracker_values[6],
# "threshold_intensity_y": tracker_values[8],
# "piezo_voltage_y": tracker_values[9],
# "enabled_y": tracker_values[10],
# "target_z": tracker_values[1],
# "threshold_intensity_z": tracker_values[3],
# "piezo_voltage_z": tracker_values[4],
# "enabled_z": bool(tracker_values[10]),
# }
enabled_y = self.tracker_info["enabled_y"]
print(f"Tracker enabled: {bool(enabled_y)}")
if self.tracker_info["tracker_intensity"] < self.tracker_info["threshold_intensity_y"]:
print(self.red+" LOW INTENSITY"+self.white)
_laser_tracker_intensity = self.tracker_info["tracker_intensity"]
print(f" PSD beam intensity: {_laser_tracker_intensity:.2f}")
_laser_tracker_y_beampos = self.tracker_info["beampos_y"]
print(f" Y beam position: {_laser_tracker_y_beampos:.2f}")
_laser_tracker_y_target = self.tracker_info["target_y"]
print(f" target position: {_laser_tracker_y_target:.2f}")
_laser_tracker_y_threshold_intensity = self.tracker_info["threshold_intensity_y"]
print(f" threshold intensity: {_laser_tracker_y_threshold_intensity:.2f}")
_laser_tracker_y_piezo_voltage = self.tracker_info["piezo_voltage_y"]
print(f" Piezo voltage: {_laser_tracker_y_piezo_voltage:.2f}")
_laser_tracker_z_beampos = self.tracker_info["beampos_z"]
print(f" Z beam position: {_laser_tracker_z_beampos:.2f}")
_laser_tracker_z_target = self.tracker_info["target_z"]
print(f" target position: {_laser_tracker_z_target:.2f}")
_laser_tracker_z_threshold_intensity = self.tracker_info["threshold_intensity_z"]
print(f" threshold intensity: {_laser_tracker_z_threshold_intensity:.2f}")
_laser_tracker_z_piezo_voltage = self.tracker_info["piezo_voltage_z"]
print(f" Piezo voltage: {_laser_tracker_z_piezo_voltage:.2f}")
print(" Reminder - there is also an upper threshold active in rt\n")
self.laser_tracker_galil_status()
@@ -363,13 +404,13 @@ class RtOMNYController(Controller):
def laser_tracker_galil_status(self):
otracky_con = self.get_device_manager().devices.otracky.obj.controller
if bool(float(otracky_con.socket_put_and_receive("MGtracken").strip())) == 0:
print("Tracking in the Galil Controller is disabled.")
print(self.red+"Tracking in the Galil Controller is disabled."+self.white)
print("Use laser_tracker_galil_enable to enable.\n")
return(0)
else:
print("Tracking in the Galil Controller is enabled.")
trackyct=bool(float(otracky_con.socket_put_and_receive("MGtrackyct").strip()))
trackzct=bool(float(otracky_con.socket_put_and_receive("MGtrackzct").strip()))
trackyct=int(float(otracky_con.socket_put_and_receive("MGtrackyct").strip()))
trackzct=int(float(otracky_con.socket_put_and_receive("MGtrackzct").strip()))
print(f"Galil Trackcounters y={trackyct}, z={trackzct}")
def show_signal_strength_interferometer(self):