work on omny gui, initial status progress bar

This commit is contained in:
Holler Mirko
2024-11-06 16:56:53 +01:00
parent b31a1ea996
commit 55aa7128e2
8 changed files with 1966 additions and 1689 deletions

View File

@@ -0,0 +1,165 @@
import time
import numpy as np
import sys
import termios
import tty
import fcntl
import os
import builtins
from rich import box
from rich.console import Console
from rich.table import Table
from bec_widgets.cli.client import BECDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
class OMNYGuiToolsError(Exception):
pass
class OMNYGuiTools:
def __init__(self, client):
self.gui = getattr(client, "gui", None)
self.fig200 = None
self.fig201 = None
self.fig202 = None
self.fig203 = None
self.progressbar = None
def omnygui_start_gui(self):
if self.gui is None or self.gui.gui_is_alive() is False:
self.gui = BECDockArea()
self.gui.show() # Dieses Kommado braucht in etwa ~3s. Dabei gibt aber einen print
self.fig200 = None
self.fig201 = None
self.fig202 = None
self.fig203 = None
def omnygui_stop_gui(self):
self.gui.close()
def _omnycam_parking(self):
self.gui_show_omnycam_parking()
def omnygui_show_omnycam_parking(self):
self.omnygui_start_gui()
if self.fig200 is None:
self._omnycam_clear()
self.fig200 = self.gui.add_dock(name="omnycam200").add_widget("BECImageWidget")
if self._omnycam_check_device_exists(dev.cam200):
fig = self.fig200.image("cam200")
fig.set_rotation(deg_90=3)
self.fig200.lock_aspect_ratio(True)
else:
print("Cannot open cam200. Device does not exist.")
self.fig203 = self.gui.add_dock(name="omnycam203").add_widget("BECImageWidget")
if self._omnycam_check_device_exists(dev.cam203):
fig = self.fig203.image("cam203")
fig.set_rotation(deg_90=3)
self.fig203.lock_aspect_ratio(True)
else:
print("Cannot open cam203. Device does not exist.")
try:
self.gui.remove_dock(name="default_figure")
except:
pass
def omnygui_remove_all_docks(self):
try:
self.gui.remove_dock(name="omnycam200")
except:
pass
try:
self.gui.remove_dock(name="omnycam201")
except:
pass
try:
self.gui.remove_dock(name="omnycam202")
except:
pass
try:
self.gui.remove_dock(name="omnycam203")
except:
pass
try:
self.gui.remove_dock(name="progress")
except:
pass
self.fig200 = None
self.fig201 = None
self.fig202 = None
self.fig203 = None
self.progressbar = None
def _omnycam_clear(self):
self.omnygui_remove_all_docks()
def _omnycam_check_device_exists(self, device):
try:
device
except:
return False
else:
return True
def _omnycam_samplestage(self):
self.gui_show_omnycam_samplestage()
def omnygui_show_omnycam_samplestage(self):
self.start_gui()
if self.fig201 is None:
self._omnycam_clear()
self.fig201 = self.gui.add_dock(name="omnycam201").add_widget("BECImageWidget")
if self._omnycam_check_device_exists(dev.cam201):
fig = self.fig201.image("cam201")
fig.set_rotation(deg_90=3)
self.fig201.lock_aspect_ratio(True)
else:
print("Cannot open cam201. Device does not exist.")
self.fig202 = self.gui.add_dock(name="omnycam202").add_widget("BECImageWidget")
if self._omnycam_check_device_exists(dev.cam202):
fig = self.fig202.image("cam202")
fig.set_rotation(deg_90=3)
self.fig202.lock_aspect_ratio(True)
else:
print("Cannot open cam202. Device does not exist.")
try:
self.gui.remove_dock(name="default_figure")
except:
pass
def omnygui_show_progress(self):
if self.progressbar is None:
# Add a new dock with a RingProgressBar widget
self.progressbar = self.gui.add_dock(name="progress").add_widget("RingProgressBar")
# Customize the size of the progress ring
self.progressbar.set_line_widths(20)
# Disable automatic updates and manually set the self.progressbar value
self.progressbar.enable_auto_updates(False)
# Set precision for the self.progressbar display
self.progressbar.set_precision(1) # Display self.progressbar with one decimal places
# Setting multiple rigns with different values
self.progressbar.set_number_of_bars(3)
# Set the values of the rings to 50, 75, and 25 from outer to inner ring
self.progressbar.set_value([50, 75, 25])
try:
self.gui.remove_dock(name="default_figure")
except:
pass
def _omnygui_update_progress(self):
if self.progressbar is not None:
projection = self.progress["projection"]
self.progressbar.set_value([projection, 75, 25])

View File

@@ -15,12 +15,13 @@ from csaxs_bec.bec_ipython_client.plugins.cSAXS import cSAXSBeamlineChecks
from csaxs_bec.bec_ipython_client.plugins.omny.omny_optics_mixin import OMNYOpticsMixin
from csaxs_bec.bec_ipython_client.plugins.omny.omny_alignment_mixin import OMNYAlignmentMixin
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
from csaxs_bec.bec_ipython_client.plugins.omny.omny_sample_transfer_mixin import OMNYSampleTransferMixin
from csaxs_bec.bec_ipython_client.plugins.omny.gui_tools import OMNYGuiTools
from csaxs_bec.bec_ipython_client.plugins.omny.omny_sample_transfer_mixin import (
OMNYSampleTransferMixin,
)
from csaxs_bec.bec_ipython_client.plugins.omny.omny_rt import OMNY_rt_client
from csaxs_bec.bec_ipython_client.plugins.omny.x_ray_eye_align import XrayEyeAlign
from bec_widgets.cli.client import BECDockArea as _BECDockArea
logger = bec_logger.logger
@@ -63,83 +64,100 @@ class OMNYInitStagesMixin:
dev.oeyex.enabled = True
dev.oeyez.enabled = True
dev.oeyey.enabled = True
def _omny_find_reference_loop(self, direction_endswitch, device, auto_retry=0):
retry_counter = 0
limits = device.get_motor_limit_switch()
if not limits[0] and not limits[1]:
raise OMNYInitError("Expect to be in a limit switch when calling the method find reference loop.")
raise OMNYInitError(
"Expect to be in a limit switch when calling the method find reference loop."
)
if limits[0] and limits[1]:
raise OMNYInitError("Both limit switches are triggered. This indicates an error.")
if limits[0]:
direction_endswitch_start = "reverse"
if limits[1]:
if limits[1]:
direction_endswitch_start = "forward"
if direction_endswitch_start != direction_endswitch:
raise OMNYInitError("Expect current endswitch to match the call of the method.")
if device.axis_is_referenced():
raise OMNYInitError("This axis is already referenced")
while not device.axis_is_referenced():
self._check_for_folerr_and_reset(device)
device.find_reference(raise_error=0)
time.sleep(1)
if not device.axis_is_referenced():
if auto_retry == 0:
if self.OMNYTools.yesno("Did not reference successfully. Try again to move to endswitch and find reference?"):
if self.OMNYTools.yesno(
"Did not reference successfully. Try again to move to endswitch and find reference?"
):
self._check_for_folerr_and_reset(device)
device.drive_axis_to_limit(direction_endswitch_start)
time.sleep(1)
elif auto_retry > 0:
if retry_counter < auto_retry:
self.OMNYTools.printgreen(f"Did not reference successfully. Try again to move to endswitch and find reference. Attempt {retry_counter+1} out of max {auto_retry}")
self.OMNYTools.printgreen(
f"Did not reference successfully. Try again to move to endswitch and find reference. Attempt {retry_counter+1} out of max {auto_retry}"
)
self._check_for_folerr_and_reset(device)
device.drive_axis_to_limit(direction_endswitch_start)
time.sleep(1)
retry_counter += 1
temperature = device.get_motor_temperature()
while temperature > 100:
self.OMNYTools.printgreen(f"The temperature of the motor is {temperature}, and larger than the threshold of 100 degC. Waiting 60 s for cool down.")
self.OMNYTools.printgreen(
f"The temperature of the motor is {temperature}, and larger than the threshold of 100 degC. Waiting 60 s for cool down."
)
time.sleep(60)
temperature = device.get_motor_temperature()
if retry_counter == auto_retry:
self.OMNYTools.printgreen("Maximum automatic init retries reached. Setting auto to 0 offering manual continuation.")
self.OMNYTools.printgreen(
"Maximum automatic init retries reached. Setting auto to 0 offering manual continuation."
)
auto_retry = 0
def _check_for_folerr_and_reset(self, device):
#if device.folerr_status():
# if device.folerr_status():
# user_input = (f"There is a following error in axis {device}. Reset and ignore to continue? y/n")
# if user_input == "y":
#for now we just reset and ignore. because it might just occur at the beginning of move to ES
# for now we just reset and ignore. because it might just occur at the beginning of move to ES
device._ogalil_folerr_reset_and_ignore()
time.sleep(0.3)
def omny_init_stages(self, autoconfirm=0, autoretry=0):
if self.OMNYTools.yesno("Starting initialization of OMNY stages.","y",autoconfirm=autoconfirm):
if self.OMNYTools.yesno(
"Starting initialization of OMNY stages.", "y", autoconfirm=autoconfirm
):
self.OMNYTools.printgreen("starting...")
else:
return
# #maybe replace 2 by something
# axis_id = dev.fheater._config["deviceConfig"].get("axis_Id")
# axis_id_numeric = self.axis_id_to_numeric(axis_id)
if not dev.oshield.controller.axis_is_referenced(2):
if self.OMNYTools.yesno("The smaract stage of the cryo shield has to be referenced first. Continue?","y"):
if self.OMNYTools.yesno(
"The smaract stage of the cryo shield has to be referenced first. Continue?", "y"
):
self.omny_init_smaract()
else:
return
if self.check_all_axes_of_OMNY_referenced():
if self.OMNYTools.yesno("All axes are referenced already. Reset and reference again?","n"):
if self.OMNYTools.yesno(
"All axes are referenced already. Reset and reference again?", "n"
):
self.OMNYTools.printgreen("ok then...")
else:
return
if np.fabs(dev.oshield.get().readback) > 0.1:
if self.OMNYTools.yesno("oshield is not around pos 0. Can the OSA be moved to -z limit then -x limit. Risk of collition!"):
if self.OMNYTools.yesno(
"oshield is not around pos 0. Can the OSA be moved to -z limit then -x limit. Risk of collition!"
):
dev.oosaz.drive_axis_to_limit("forward")
dev.oosax.drive_axis_to_limit("forward")
else:
@@ -147,110 +165,115 @@ class OMNYInitStagesMixin:
umv(dev.oshield, 0)
dev.ofzpx.controller.socket_put_confirmed("XQ#CRESET")
dev.osamx.controller.socket_put_confirmed("XQ#CRESET")
dev.osamy.controller.socket_put_confirmed("XQ#CRESET")
time.sleep(1)
if self.OMNYTools.yesno("drive fzp to -Z,-X,+Y limit and reference?","y",autoconfirm=autoconfirm):
if self.OMNYTools.yesno(
"drive fzp to -Z,-X,+Y limit and reference?", "y", autoconfirm=autoconfirm
):
dev.ofzpz.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("ofzpz reached limit")
dev.ofzpy.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("ofzpy reached limit")
dev.ofzpx.drive_axis_to_limit("reverse")
self.OMNYTools.printgreen("ofzpx reached limit")
#dev.ofzpz.find_reference()
self._omny_find_reference_loop("forward", dev.ofzpz,auto_retry=autoretry)
# dev.ofzpz.find_reference()
self._omny_find_reference_loop("forward", dev.ofzpz, auto_retry=autoretry)
self.OMNYTools.printgreen("ofzpz referenced")
#dev.ofzpy.find_reference()
self._omny_find_reference_loop("forward", dev.ofzpy,auto_retry=autoretry)
# dev.ofzpy.find_reference()
self._omny_find_reference_loop("forward", dev.ofzpy, auto_retry=autoretry)
self.OMNYTools.printgreen("ofzpy referenced")
#dev.ofzpx.find_reference()
self._omny_find_reference_loop("reverse", dev.ofzpx,auto_retry=autoretry)
# dev.ofzpx.find_reference()
self._omny_find_reference_loop("reverse", dev.ofzpx, auto_retry=autoretry)
self.OMNYTools.printgreen("ofzpx referenced")
#ranges:
#X -4.9 to 7.2
#Y -3.5 to 7.9
#Z -3 to 95.6
# ranges:
# X -4.9 to 7.2
# Y -3.5 to 7.9
# Z -3 to 95.6
else:
return
if self.OMNYTools.yesno("drive otransy stage to +Y limit?","y"):
if self.OMNYTools.yesno("drive otransy stage to +Y limit?", "y"):
dev.otransy.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("otrany reached limit")
else:
return
self.OMNYTools.printgreen("For the next step the gripper has to be at a good z position. Risk of collision!")
self.OMNYTools.printgreen(
"For the next step the gripper has to be at a good z position. Risk of collision!"
)
if self.OMNYTools.yesno("drive otransx stage to +X limit and find reference?"):
dev.otransx.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("otransx reached limit")
#dev.otransx.find_reference()
self._omny_find_reference_loop("forward", dev.otransx,auto_retry=autoretry)
# dev.otransx.find_reference()
self._omny_find_reference_loop("forward", dev.otransx, auto_retry=autoretry)
self.OMNYTools.printgreen("otransx referenced")
else:
return
if self.OMNYTools.yesno("drive otransz stage to +Z limit and reference?","y",autoconfirm=autoconfirm):
if self.OMNYTools.yesno(
"drive otransz stage to +Z limit and reference?", "y", autoconfirm=autoconfirm
):
dev.otransz.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("otransz reached limit")
#dev.otransz.find_reference()
self._omny_find_reference_loop("forward", dev.otransz,auto_retry=autoretry)
# dev.otransz.find_reference()
self._omny_find_reference_loop("forward", dev.otransz, auto_retry=autoretry)
self.OMNYTools.printgreen("otransz referenced")
else:
return
#get xeye out of the way
if self.OMNYTools.yesno("drive oeyey to +Y limit?","y"):
# get xeye out of the way
if self.OMNYTools.yesno("drive oeyey to +Y limit?", "y"):
dev.oeyey.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("oeyey reached limit")
else:
return
if self.OMNYTools.yesno("drive oeyex to -X limit?","y",autoconfirm=autoconfirm):
if self.OMNYTools.yesno("drive oeyex to -X limit?", "y", autoconfirm=autoconfirm):
dev.oeyex.drive_axis_to_limit("reverse")
self.OMNYTools.printgreen("oeyex reached limit")
else:
return
if self.OMNYTools.yesno("drive oeyez to +Z limit?","y",autoconfirm=autoconfirm):
if self.OMNYTools.yesno("drive oeyez to +Z limit?", "y", autoconfirm=autoconfirm):
dev.oeyez.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("oeyez reached limit")
else:
return
if self.OMNYTools.yesno("reference oeyex, then oeyez?","y",autoconfirm=autoconfirm):
#dev.oeyex.find_reference()
self._omny_find_reference_loop("reverse", dev.oeyex,auto_retry=autoretry)
if self.OMNYTools.yesno("reference oeyex, then oeyez?", "y", autoconfirm=autoconfirm):
# dev.oeyex.find_reference()
self._omny_find_reference_loop("reverse", dev.oeyex, auto_retry=autoretry)
self.OMNYTools.printgreen("oeyex referenced")
#dev.oeyez.find_reference()
self._omny_find_reference_loop("forward", dev.oeyez,auto_retry=autoretry)
# dev.oeyez.find_reference()
self._omny_find_reference_loop("forward", dev.oeyez, auto_retry=autoretry)
self.OMNYTools.printgreen("oeyez referenced")
else:
return
if self.OMNYTools.yesno("find reference mark of oeyey?","y",autoconfirm=autoconfirm):
#print("oeyey now in limit")
if self.OMNYTools.yesno("find reference mark of oeyey?", "y", autoconfirm=autoconfirm):
# print("oeyey now in limit")
dev.oeyez.drive_axis_to_limit("forward")
dev.oeyey.controller.socket_put_confirmed("indspeed[7]=2000")
#dev.oeyey.find_reference()
self._omny_find_reference_loop("forward", dev.oeyey,auto_retry=autoretry)
# dev.oeyey.find_reference()
self._omny_find_reference_loop("forward", dev.oeyey, auto_retry=autoretry)
self.OMNYTools.printgreen("oeyey is now referenced")
#ensure closed shuttle and shuttle aligner down
# ensure closed shuttle and shuttle aligner down
if not dev.oshuttleopen.get_motor_limit_switch()[0]:
dev.oshuttleopen.drive_axis_to_limit("reverse")
if not dev.oshuttleopen.get_motor_limit_switch()[0]:
raise OMNYInitError("Failed to drive shuttle opener to reverse limit")
else:
self.OMNYTools.printgreen("shuttle opener is down, i.e. in safe position to continue.")
#move shuttle aligner down
#self._otransfer_shuttle_aligner_down()
#the above is ins sample transfer mixin. can it be accessible from here? so code below
# move shuttle aligner down
# self._otransfer_shuttle_aligner_down()
# the above is ins sample transfer mixin. can it be accessible from here? so code below
if not dev.oshuttlealign.get_motor_limit_switch()[1]:
dev.oshuttlealign.drive_axis_to_limit("forward")
@@ -259,131 +282,142 @@ class OMNYInitStagesMixin:
else:
self.OMNYTools.printgreen("shuttle aligner is down, i.e. in safe position to continue.")
if self.OMNYTools.yesno("drive parking stage to +Z limit and reference?","y",autoconfirm=autoconfirm):
if self.OMNYTools.yesno(
"drive parking stage to +Z limit and reference?", "y", autoconfirm=autoconfirm
):
dev.oparkz.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("oparkz is in limit")
#dev.oparkz.find_reference()
self._omny_find_reference_loop("forward", dev.oparkz,auto_retry=autoretry)
# dev.oparkz.find_reference()
self._omny_find_reference_loop("forward", dev.oparkz, auto_retry=autoretry)
self.OMNYTools.printgreen("oparkz is referenced")
else:
return
if self.OMNYTools.yesno("OK, tricky part. Can the OSA be moved to -z limit then -x limit. Risk of collition!"):
if self.OMNYTools.yesno(
"OK, tricky part. Can the OSA be moved to -z limit then -x limit. Risk of collition!"
):
dev.oosaz.drive_axis_to_limit("forward")
dev.oosax.drive_axis_to_limit("forward")
else:
raise OMNYInitError("Automatic init not possible in this case. Please initialize manually.")
raise OMNYInitError(
"Automatic init not possible in this case. Please initialize manually."
)
if self.OMNYTools.yesno("drive otrackz to -Z limit?","y",autoconfirm=autoconfirm):
if self.OMNYTools.yesno("drive otrackz to -Z limit?", "y", autoconfirm=autoconfirm):
dev.otrackz.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("otrackz is in limit")
else:
return
if self.OMNYTools.yesno("drive osamx to +X limit?","y",autoconfirm=autoconfirm):
if self.OMNYTools.yesno("drive osamx to +X limit?", "y", autoconfirm=autoconfirm):
dev.osamx.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("osamx is in limit")
else:
return
if self.OMNYTools.yesno("drive oosay to +Y limit?","y",autoconfirm=autoconfirm):
if self.OMNYTools.yesno("drive oosay to +Y limit?", "y", autoconfirm=autoconfirm):
dev.oosay.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("oosay is in limit")
else:
return
if self.OMNYTools.yesno("find reference mark of osamx?","y",autoconfirm=autoconfirm):
self._omny_find_reference_loop("forward", dev.osamx,auto_retry=autoretry)
#dev.osamx.find_reference()
if self.OMNYTools.yesno("find reference mark of osamx?", "y", autoconfirm=autoconfirm):
self._omny_find_reference_loop("forward", dev.osamx, auto_retry=autoretry)
# dev.osamx.find_reference()
self.OMNYTools.printgreen("osamx is referenced")
#osamx range limit -2 to 7 good for cryolink
# osamx range limit -2 to 7 good for cryolink
#if osamz is ever needed, here is how it could be initialized (in spec code)
#if (!_ogalil_axis_is_referenced_mne(osamx) || fabs(A[osamx]) > 0.1 )
#{
#printf("The osamx stage is not referenced or is not at the reference position. Aborting.\n")
#exit
#}
#drive osamz to -Z limit
#+Z direction is critical because of cryolink connection
#if(!yesno("drive osamz to -Z limit and perform reference search?",1))
# if osamz is ever needed, here is how it could be initialized (in spec code)
# if (!_ogalil_axis_is_referenced_mne(osamx) || fabs(A[osamx]) > 0.1 )
# {
# printf("The osamx stage is not referenced or is not at the reference position. Aborting.\n")
# exit
#osamz to limit
#_ogalil_drive_to_limit_mne(osamz,1)
#_ogalil_find_reference_mark_mne(osamz)
#set_lm osamz -.5 .5
# }
# drive osamz to -Z limit
# +Z direction is critical because of cryolink connection
# if(!yesno("drive osamz to -Z limit and perform reference search?",1))
# exit
# osamz to limit
# _ogalil_drive_to_limit_mne(osamz,1)
# _ogalil_find_reference_mark_mne(osamz)
# set_lm osamz -.5 .5
else:
return
if self.OMNYTools.yesno("drive osamy to -Y limit?","y"):
if self.OMNYTools.yesno("drive osamy to -Y limit?", "y"):
dev.osamy.drive_axis_to_limit("reverse")
self.OMNYTools.printgreen("osamy is in limit")
else:
return
if self.OMNYTools.yesno("perform reference search of oosax?","y"):
if self.OMNYTools.yesno("perform reference search of oosax?", "y"):
self._omny_find_reference_loop("forward", dev.oosax)
#dev.oosax.find_reference()
# dev.oosax.find_reference()
self.OMNYTools.printgreen("oosax is referenced")
else:
return
if self.OMNYTools.yesno("find reference mark of oosay?","y"):
if self.OMNYTools.yesno("find reference mark of oosay?", "y"):
self._omny_find_reference_loop("forward", dev.oosay)
#dev.oosay.find_reference()
# dev.oosay.find_reference()
self.OMNYTools.printgreen("oosay is referenced")
else:
return
if self.OMNYTools.yesno("find reference mark of osamy?","y"):
if self.OMNYTools.yesno("find reference mark of osamy?", "y"):
self._omny_find_reference_loop("reverse", dev.osamy)
#dev.osamy.find_reference()
# dev.osamy.find_reference()
self.OMNYTools.printgreen("osamy is referenced")
else:
return
if self.OMNYTools.yesno("find reference mark of oosaz?","y"):
if self.OMNYTools.yesno("find reference mark of oosaz?", "y"):
self._omny_find_reference_loop("forward", dev.oosaz)
#dev.oosaz.find_reference()
# dev.oosaz.find_reference()
self.OMNYTools.printgreen("oosaz is referenced")
else:
return
if self.OMNYTools.yesno("find endswitch and reference mark of osamroy?","y",autoconfirm=autoconfirm):
if self.OMNYTools.yesno(
"find endswitch and reference mark of osamroy?", "y", autoconfirm=autoconfirm
):
dev.osamroy.drive_axis_to_limit("reverse")
self.OMNYTools.printgreen("osamroy is in limit")
self._omny_find_reference_loop("reverse", dev.osamroy,auto_retry=autoretry)
#dev.osamroy.find_reference()
self._omny_find_reference_loop("reverse", dev.osamroy, auto_retry=autoretry)
# dev.osamroy.find_reference()
self.OMNYTools.printgreen("osamroy is referenced")
else:
return
if self.OMNYTools.yesno("find reference mark of transfer Y?",autoconfirm=autoconfirm):
self._omny_find_reference_loop("forward", dev.otransy,auto_retry=autoretry)
#dev.otransy.find_reference()
if self.OMNYTools.yesno("find reference mark of transfer Y?", autoconfirm=autoconfirm):
self._omny_find_reference_loop("forward", dev.otransy, auto_retry=autoretry)
# dev.otransy.find_reference()
self.OMNYTools.printgreen("otransy is referenced")
else:
return
if self.OMNYTools.yesno("find endswitches and reference marks of tracking stage system?","y",autoconfirm=autoconfirm):
if self.OMNYTools.yesno(
"find endswitches and reference marks of tracking stage system?",
"y",
autoconfirm=autoconfirm,
):
dev.otracky.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("otracky is in limit")
self._omny_find_reference_loop("forward", dev.otracky,auto_retry=autoretry)
#dev.otracky.find_reference()
self._omny_find_reference_loop("forward", dev.otracky, auto_retry=autoretry)
# dev.otracky.find_reference()
self.OMNYTools.printgreen("otracky is referenced")
self._omny_find_reference_loop("forward", dev.otrackz,auto_retry=autoretry)
#dev.otrackz.find_reference()
self._omny_find_reference_loop("forward", dev.otrackz, auto_retry=autoretry)
# dev.otrackz.find_reference()
self.OMNYTools.printgreen("otrackz is referenced")
else:
return
#all three controllers
# all three controllers
dev.ofzpx.controller._ogalil_folerr_not_ignore()
dev.osamx.controller._ogalil_folerr_not_ignore()
dev.osamy.controller._ogalil_folerr_not_ignore()
#adjust acceleration of tracking stages
# adjust acceleration of tracking stages
dev.osamy.controller.socket_put_confirmed("ACE=11264")
dev.osamy.controller.socket_put_confirmed("ACB=11264")
dev.osamy.controller.socket_put_confirmed("DCE=11264")
@@ -395,22 +429,24 @@ class OMNYInitStagesMixin:
self._setalllimitsomny()
def omny_init_smaract(self):
#_smar_rt_set_max_closed_loop_frequency(0,0,3000)
#_smar_rt_set_max_closed_loop_frequency(0,1,3000)
#_smar_rt_set_max_closed_loop_frequency(0,2,3000)
# _smar_rt_set_max_closed_loop_frequency(0,0,3000)
# _smar_rt_set_max_closed_loop_frequency(0,1,3000)
# _smar_rt_set_max_closed_loop_frequency(0,2,3000)
dev.oshield.controller.find_reference_mark(1, 0, 1000, 1)
time.sleep(2)
dev.oshield.controller.find_reference_mark(0, 0, 1000, 1)
time.sleep(2)
if not dev.otransy.get_motor_limit_switch()[1]:
if self.OMNYTools.yesno("drive otransy stage to +Y limit?","y"):
if self.OMNYTools.yesno("drive otransy stage to +Y limit?", "y"):
dev.otransy.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("otransy reached limit")
else:
self.OMNYTools.printgreen("otransy is in upper limit, i.e. safe condition")
if self.OMNYTools.yesno("Problematic part: Can the OSA be moved to -z limit then -x limit. Risk of collition!"):
if self.OMNYTools.yesno(
"Problematic part: Can the OSA be moved to -z limit then -x limit. Risk of collition!"
):
dev.oosaz.drive_axis_to_limit("forward")
dev.oosax.drive_axis_to_limit("forward")
else:
@@ -423,14 +459,14 @@ class OMNYInitStagesMixin:
dev.ocsx.limits = [-2, 4.3]
dev.ocsy.limits = [-1, 3]
dev.oshield.controller.set_closed_loop_move_speed(0,2)
dev.oshield.controller.set_closed_loop_move_speed(1,2)
dev.oshield.controller.set_closed_loop_move_speed(2,2)
dev.oshield.controller.set_closed_loop_move_speed(0, 2)
dev.oshield.controller.set_closed_loop_move_speed(1, 2)
dev.oshield.controller.set_closed_loop_move_speed(2, 2)
#enable sensor for repeatable open loop motion
# enable sensor for repeatable open loop motion
dev.oshield.controller.socket_put_and_receive("SSE1")
if self.OMNYTools.yesno("Move CS out of the beam path?","y"):
if self.OMNYTools.yesno("Move CS out of the beam path?", "y"):
umv(dev.ocsx, 4.3, dev.ocsy, 2)
self.OMNYTools.printgreen("Finished initialization of OMNY smaract system")
@@ -454,7 +490,7 @@ class OMNYInitStagesMixin:
dev.otransx.limits = [-459.0000, 33.0000]
dev.otransy.limits = [-41.0000, 1.0000]
dev.otransz.limits = [-70.5000, 11.0000]
dev.oparkz .limits = [-165.0000, 1.0000]
dev.oparkz.limits = [-165.0000, 1.0000]
dev.otracky.limits = [-3.0000, -7.0000]
dev.otrackz.limits = [-2.0000, 1.0000]
dev.ocsy.limits = [-2.0000, 2.0000]
@@ -487,33 +523,29 @@ class OMNYInitStagesMixin:
dev.otracky.limits = [-7, -3]
dev.otrackz.limits = [-2, 1]
otracky_start_pos = self.OMNYTools._get_user_param_safe("otracky","start_pos")
otrackz_start_pos = self.OMNYTools._get_user_param_safe("otrackz","start_pos")
otracky_start_pos = self.OMNYTools._get_user_param_safe("otracky", "start_pos")
otrackz_start_pos = self.OMNYTools._get_user_param_safe("otrackz", "start_pos")
umv(dev.otracky, otracky_start_pos, dev.otrackz, otrackz_start_pos)
#adjust acceleration and speed of osamx
# adjust acceleration and speed of osamx
dev.osamx.controller.socket_put_confirmed("SPA=100")
dev.osamx.controller.socket_put_confirmed("ACA=1878")
dev.osamx.controller.socket_put_confirmed("DCA=187")
#adjust acceleration of tracking stages
# adjust acceleration of tracking stages
dev.osamy.controller.socket_put_confirmed("ACE=11264")
dev.osamy.controller.socket_put_confirmed("ACB=11264")
dev.osamy.controller.socket_put_confirmed("DCE=11264")
dev.osamy.controller.socket_put_confirmed("DCB=11264")
def check_all_axes_of_OMNY_referenced(self) -> bool:
if (
dev.ofzpx.controller.all_axes_referenced()):
if dev.ofzpx.controller.all_axes_referenced():
self.OMNYTools.printgreen("All axes of OMNY are referenced.")
return True
else:
return False
class OMNY(
OMNYInitStagesMixin,
OMNYSampleTransferMixin,
@@ -521,6 +553,7 @@ class OMNY(
OMNYOpticsMixin,
cSAXSBeamlineChecks,
OMNY_rt_client,
OMNYGuiTools,
):
def __init__(self, client):
super().__init__()
@@ -543,33 +576,22 @@ class OMNY(
self.corr_pos_y_2 = []
self.corr_angle_y_2 = []
self.progress = {}
self.progress["subtomo"] = 0
self.progress["subtomo_projection"] = 0
self.progress["subtomo_total_projections"] = 0
self.progress["projection"] = 0
self.progress["total_projections"] = 0
self.progress["angle"] = 0
self.progress["tomo_type"] = 0
self.OMNYTools = OMNYTools(self.client)
OMNY_rt_client.__init__(self)
self.align = XrayEyeAlign(self.client, self)
self.gui = getattr(client, 'gui',None)
self.fig = None
def start_gui(self):
if self.gui is None:
self.gui = _BECDockArea()
self.gui.show()
def stop_gui(self):
if self.gui is not None:
self.fig = None
self.gui.close()
self.gui = None
def add_image_dock(self):
self.fig = self.gui.add_dock(name="cam1").add_widget("BECImageWidget")
self.fig.image("cam200")
self.fig.set_rotation(3)
#self.gui.remove_dock(dock_name="Waveform Dock")
OMNYGuiTools.__init__(self, self.client)
def start_x_ray_eye_alignment(self):
if self.OMNYTools.yesno(
"Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.","y"
"Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.",
"y",
):
self.align = XrayEyeAlign(self.client, self)
try:
@@ -1319,7 +1341,7 @@ class OMNY(
)
print(f"\nSample name: {self.sample_name}\n")
if self.OMNYTools.yesno("Are these parameters correctly set for your scan?","y"):
if self.OMNYTools.yesno("Are these parameters correctly set for your scan?", "y"):
print("... excellent!")
else:
self.tomo_countingtime = self._get_val("<ctime> s", self.tomo_countingtime, float)
@@ -1347,16 +1369,18 @@ class OMNY(
print(f"The angular step in a subtomogram it will be {self.tomo_angle_stepsize}")
if self.tomo_type == 2:
code = self._get_val("This mode causes significant wear in OMNY. Enter authorization code.",0,str,)
code = self._get_val(
"This mode causes significant wear in OMNY. Enter authorization code.", 0, str
)
if code == "x12sa":
self.golden_ratio_bunch_size = self._get_val(
"Number of projections sorted per bunch (minimum 100)",
self.golden_ratio_bunch_size,
int,
)
if self.golden_ratio_bunch_size<100:
if self.golden_ratio_bunch_size < 100:
print("Minimum of 100 selected.")
self.golden_ratio_bunch_size=100
self.golden_ratio_bunch_size = 100
self.golden_max_number_of_projections = self._get_val(
"Stop after number of projections (zero for endless)",
self.golden_max_number_of_projections,
@@ -1368,11 +1392,15 @@ class OMNY(
int,
)
else:
print("Wrong authorization code. Tomo type 1 selected: 2 sub-tomograms selected.")
print(
"Wrong authorization code. Tomo type 1 selected: 2 sub-tomograms selected."
)
self.tomo_type = 1
if self.tomo_type == 3:
code = self._get_val("This mode causes significant wear in OMNY. Enter authorization code.",0,str,)
code = self._get_val(
"This mode causes significant wear in OMNY. Enter authorization code.", 0, str
)
if code == "x12sa":
numprj = self._get_val(
"Number of projections per sub-tomogram (minimum 100)",
@@ -1394,7 +1422,9 @@ class OMNY(
int,
)
else:
print("Wrong authorization code. Tomo type 1 selected: 2 sub-tomograms selected.")
print(
"Wrong authorization code. Tomo type 1 selected: 2 sub-tomograms selected."
)
self.tomo_type = 1
@staticmethod
@@ -1452,7 +1482,7 @@ class OMNY(
f"{'Stitching:':<{padding}}{stitching:>{padding}}\n",
f"{'Number of individual sub-tomograms:':<{padding}}{8:>{padding}}\n",
f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n",
"Add current temperature and pressure status"
"Add current temperature and pressure status",
]
content = "".join(content)
user_target = os.path.expanduser(f"~/Data10/documentation/tomo_scan_ID_{self.tomo_id}.pdf")

View File

@@ -8,6 +8,7 @@ from rich.table import Table
from typeguard import typechecked
from bec_lib import bec_logger
logger = bec_logger.logger
@@ -192,7 +193,7 @@ class OMNYAlignmentMixin:
print(
f"Loading default mirror correction from file {correction_file} containing {int_num_elements} elements."
)
#print(corr_pos)
# print(corr_pos)
return corr_pos, corr_angle
def read_additional_correction_x(self, correction_file: str):

View File

@@ -11,7 +11,7 @@ from rich import box
from rich.console import Console
from rich.table import Table
#from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
@@ -20,21 +20,21 @@ if builtins.__dict__.get("bec") is not None:
umvr = builtins.__dict__.get("umvr")
class OMNYToolsError(Exception):
pass
class OMNYTools:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
def __init__(self, client) -> None:
self.client = client
@@ -45,19 +45,19 @@ class OMNYTools:
if not param or param.get(var) is None:
raise ValueError(f"Device {device} has no user parameter definition for {var}.")
return param.get(var)
def printgreen(self,string:str):
def printgreen(self, string: str):
print(self.OKGREEN + string + self.ENDC)
def printgreenbold(self,string:str):
def printgreenbold(self, string: str):
print(self.BOLD + self.OKGREEN + string + self.ENDC)
def yesno(self,message :str,default="none",autoconfirm=0) -> bool:
def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
if autoconfirm and default == "y":
self.printgreen (message + " Automatically confirming default: yes")
self.printgreen(message + " Automatically confirming default: yes")
return True
elif autoconfirm and default == "n":
self.printgreen (message + " Automatically confirming default: no")
self.printgreen(message + " Automatically confirming default: no")
return False
if default == "y":
message_ending = " [Y]/n? "
@@ -66,20 +66,25 @@ class OMNYTools:
else:
message_ending = " y/n? "
while True:
user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
if (user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes") or (default == "y" and user_input == ""):
user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
if (
user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
) or (default == "y" and user_input == ""):
return True
if (user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No") or (default == "n" and user_input == ""):
if (
user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
) or (default == "n" and user_input == ""):
return False
else:
print("Please expicitely confirm y or n.")
def tweak_cursor(self,dev1, step1:float, dev2="none", step2:float="0", special_command = "none"):
def tweak_cursor(
self, dev1, step1: float, dev2="none", step2: float = "0", special_command="none"
):
if dev1 not in dev.enabled_devices:
print(f"Device 1 {dev} is not in enabled devices.")
return
if dev2 not in dev.enabled_devices and dev2 != "none":
if dev2 not in dev.enabled_devices and dev2 != "none":
print(f"Device 2 {dev} is not in enabled devices.")
return
# Save the current terminal settings
@@ -91,49 +96,49 @@ class OMNYTools:
# Set stdin to non-blocking mode
old_flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, old_flags | os.O_NONBLOCK)
print("Tweak Cursor."+self.BOLD+self.OKBLUE+"Press (q) to quit!\r"+self.ENDC)
print("Tweak Cursor." + self.BOLD + self.OKBLUE + "Press (q) to quit!\r" + self.ENDC)
while True:
try:
# Read single character input
key = sys.stdin.read(1)
if key == 'q':
if key == "q":
print("\n\rExiting tweak mode\r")
break
elif key == '\x1b': # Escape sequences for arrow keys
elif key == "\x1b": # Escape sequences for arrow keys
next1, next2 = sys.stdin.read(2)
if next1 == '[':
if next2 == 'A':
#print("up")
if(dev2 != "none"):
umvr(dev2,step2)
if(special_command != "none"):
if next1 == "[":
if next2 == "A":
# print("up")
if dev2 != "none":
umvr(dev2, step2)
if special_command != "none":
special_command()
elif next2 == 'B':
#print(" down")
if(dev2 != "none"):
umvr(dev2,-step2)
if(special_command != "none"):
elif next2 == "B":
# print(" down")
if dev2 != "none":
umvr(dev2, -step2)
if special_command != "none":
special_command()
elif next2 == 'C':
#print("right")
umvr(dev1,step1)
if(special_command != "none"):
elif next2 == "C":
# print("right")
umvr(dev1, step1)
if special_command != "none":
special_command()
elif next2 == 'D':
#print("left")
umvr(dev1,-step1)
if(special_command != "none"):
elif next2 == "D":
# print("left")
umvr(dev1, -step1)
if special_command != "none":
special_command()
elif key == '+':
step1 = step1*2
if(dev2 != "none"):
step2 = step2*2
elif key == "+":
step1 = step1 * 2
if dev2 != "none":
step2 = step2 * 2
print(f"\rDouble step size. New step size: {step1}, {step2}\r")
elif key == '-':
step1 = step1/2
if(dev2 != "none"):
step2 = step2/2
elif key == "-":
step1 = step1 / 2
if dev2 != "none":
step2 = step2 / 2
print(f"\rHalf step size. New step size: {step1}, {step2}\r")
except IOError:
# No input available, keep looping
@@ -146,6 +151,3 @@ class OMNYTools:
# Restore the terminal to its original state
termios.tcsetattr(fd, termios.TCSADRAIN, old_term)
fcntl.fcntl(fd, fcntl.F_SETFL, old_flags)

View File

@@ -7,9 +7,11 @@ from rich.table import Table
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_put, fshclose
class OMNYError(Exception):
pass
class OMNYOpticsMixin:
@staticmethod
def _get_user_param_safe(device, var):
@@ -18,10 +20,9 @@ class OMNYOpticsMixin:
raise ValueError(f"Device {device} has no user parameter definition for {var}.")
return param.get(var)
def ooptics_in(self):
self.ofzp_in()
#ocs_in
# ocs_in
self.oosa_in()
if "rtx" in dev and dev.rtx.enabled:
@@ -29,14 +30,16 @@ class OMNYOpticsMixin:
self.align.update_frame()
user_input = input("Is the direct beam gone on the xray eye? Do you see the cone of the FZP?")
user_input = input(
"Is the direct beam gone on the xray eye? Do you see the cone of the FZP?"
)
if user_input == "y":
printf("Next oeye_out...\n")
else:
raise OMNYError("Failed to properly move in the Xray optics")
def _oeyey_mv(self, position):
#direction dependent speeds
# direction dependent speeds
if dev.oeyez.get().readback < position:
dev.oeyez.controller.socket_put_confirmed("axspeed[7]=15000")
else:
@@ -55,29 +58,30 @@ class OMNYOpticsMixin:
raise OMNYError("The optics were not moved in. Please do so prior to eyey_out")
self.OMNYTools.printgreen("Oeye is out.")
def oeye_cam_in(self):
if dev.oeyez.get().readback < -80:
umv(dev.oeyez, -50)
if np.fabs(dev.oeyey.get().readback+4.8) > 0.1:
if np.fabs(dev.oeyey.get().readback + 4.8) > 0.1:
self._oeyey_mv(-4.8)
if np.fabs(dev.oeyez.get().readback+2) > 0.1 or np.fabs(dev.oeyex.get().readback) > 0.1:
if np.fabs(dev.oeyez.get().readback + 2) > 0.1 or np.fabs(dev.oeyex.get().readback) > 0.1:
umv(dev.oeyez, -2, dev.oeyex, 0)
#if still too close in z -- safety check
if np.fabs(dev.oeyez.get().readback+2) > 0.1:
# if still too close in z -- safety check
if np.fabs(dev.oeyez.get().readback + 2) > 0.1:
raise OMNYError("The oeye is too close in z for transfer. ERROR! Aborting.")
self.OMNYTools.printgreen("Oeye is at cam position.")
def _oeye_xray_is_in(self) -> bool:
omny_oeye_xray_inx = self._get_user_param_safe("oeyex", "xray_in")
omny_oeye_xray_iny = self._get_user_param_safe("oeyey", "xray_in")
omny_oeye_currentx = dev.oeyex.get().readback
omny_oeye_currenty = dev.oeyey.get().readback
if np.fabs(omny_oeye_currentx - omny_oeye_xray_inx)<0.1 and np.fabs(omny_oeye_currenty - omny_oeye_xray_iny)<0.1:
if (
np.fabs(omny_oeye_currentx - omny_oeye_xray_inx) < 0.1
and np.fabs(omny_oeye_currenty - omny_oeye_xray_iny) < 0.1
):
return True
else:
return False
@@ -86,64 +90,64 @@ class OMNYOpticsMixin:
if self._oeye_xray_is_in():
pass
else:
#todo
# todo
# self._otransfer_gripper_safe_xray_in_operation()
#if(!_oshield_is_ST_closed())
#{
# if(!_oshield_is_ST_closed())
# {
# printf("The shield of the sample stage is not closed. Aborting.\n")
# exit
#}
# }
omny_oeye_xray_inx = self._get_user_param_safe("oeyex", "xray_in")
omny_oeye_xray_iny = self._get_user_param_safe("oeyey", "xray_in")
omny_oeye_xray_inz = self._get_user_param_safe("oeyez", "xray_in")
self._oeyey_mv(omny_oeye_xray_iny)
omny_oeye_currenty = dev.oeyey.get().readback
if np.fabs(omny_oeye_currenty-omny_oeye_xray_iny)>0.1:
if np.fabs(omny_oeye_currenty - omny_oeye_xray_iny) > 0.1:
raise OMNYError("The oeye did not move up.\n")
umv(dev.oeyex, omny_oeye_xray_inx, dev.oeyez, omny_oeye_xray_inz)
self.OMNYTools.printgreen("Oeye is at X-ray position.")
#some notes for the vis microscope:
#initial position for the vis light microscope
#do not open the shield when the microscope is at the vis mic position
#found eoeyx -45.13, z -84.9, y 0.64
#for a samy position of 2.8 with delta off
#the osa position should be in z around 7.4. in x it seems better
#around -0.6, where potentially xrays dont pass anymore
# some notes for the vis microscope:
# initial position for the vis light microscope
# do not open the shield when the microscope is at the vis mic position
# found eoeyx -45.13, z -84.9, y 0.64
# for a samy position of 2.8 with delta off
# the osa position should be in z around 7.4. in x it seems better
# around -0.6, where potentially xrays dont pass anymore
#
def _oosa_check_y(self):
omny_oosa_currenty = dev.oosay.get().readback
if np.fabs(omny_oosa_currenty-0.9)>0.05:
if np.fabs(omny_oosa_currenty - 0.9) > 0.05:
umv(dev.oosay, 0.9)
omny_oosa_currenty = dev.oosay.get().readback
if np.fabs(omny_oosa_currenty-0.9)>0.05:
if np.fabs(omny_oosa_currenty - 0.9) > 0.05:
raise OMNYError("oosay is not around 0.9. Aborting.")
def _oosa_to_move_corridor(self):
self._oosa_check_y()
dev.oosax.limits = [-3, 3.7] #risk collision with shield
dev.oosax.limits = [-3, 3.7] # risk collision with shield
umv(dev.oosax, -2)
dev.oosax.read(cached=False)
omny_oosa_currentx = dev.oosax.get().readback
if np.fabs(omny_oosa_currentx+2)>0.1:
if np.fabs(omny_oosa_currentx + 2) > 0.1:
raise OMNYError("oosax did not reach target position. Not moving in z.\n")
def oosa_in(self):
self._oosa_check_y()
dev.oshield.read(cached=False)
omny_oshield_current = dev.oshield.get().readback
if omny_oshield_current<15:
if omny_oshield_current < 15:
self._oshield_ST_close()
if self.near_field==False:
if self.near_field == False:
x_in_pos = self._get_user_param_safe("oosax", "far_field_in")
y_in_pos = self._get_user_param_safe("oosay", "far_field_in")
z_in_pos = self._get_user_param_safe("oosaz", "far_field_in")
print("OSA movement in far-field mode.")
dev.oosaz.read(cached=False)
omny_oosa_currentz = dev.oosaz.get().readback
if omny_oosa_currentz<6.4:
if omny_oosa_currentz < 6.4:
self._oosa_to_move_corridor()
dev.oosaz.limits = [6.4, 6.6]
umv(dev.oosaz, z_in_pos)
@@ -151,30 +155,30 @@ class OMNYOpticsMixin:
umv(dev.oosay, y_in_pos)
#### For the 30 nm FZP 220 um we use this part
# umv oosaz 6.5
# umv oosax 3.2453
# umv oosax 3.2453
# umv oosay 0.386015
if self.near_field==True:
if self.near_field == True:
x_in_pos = self._get_user_param_safe("oosax", "near_field_in")
y_in_pos = self._get_user_param_safe("oosay", "near_field_in")
z_in_pos = self._get_user_param_safe("oosaz", "near_field_in")
print("OSA movement in near-field mode.")
dev.oosaz.read(cached=False)
omny_oosa_currentz = dev.oosaz.get().readback
if omny_oosa_currentz>0:
if omny_oosa_currentz > 0:
self._oosa_to_move_corridor()
dev.oosaz.limits = [-0.4, -0.6]
umv(dev.oosaz, z_in_pos)
umv(dev.oosaz, z_in_pos)
umv(dev.oosax, x_in_pos)
omny_osamy_current = dev.osamy.get().readback
if omny_osamy_current<3.25:
umv(dev.oosay, y_in_pos)
if omny_osamy_current < 3.25:
umv(dev.oosay, y_in_pos)
else:
raise OMNYError("Failed to move oosa in. osamy position is too large.")
self.OMNYTools.printgreen("OSA is in.")
#todo
# todo
# _omny_interferometer_align_tracking
# rt_feedback_enable
@@ -182,33 +186,33 @@ class OMNYOpticsMixin:
self._oosa_check_y()
dev.oshield.read(cached=False)
omny_oshield_current = dev.oshield.get().readback
if omny_oshield_current<15:
if omny_oshield_current < 15:
self._oshield_ST_close()
omny_oosaz_current = dev.oosaz.get().readback
if self.near_field==False:
if self.near_field == False:
print("OSA movement in far-field mode.")
if omny_oosaz_current<6.4:
if omny_oosaz_current < 6.4:
self._oosa_to_move_corridor()
dev.oosaz.limits = [6.4, 6.6]
umv(dev.oosaz, 6.5)
umv(dev.oosax, -2)
if self.near_field==True:
if self.near_field == True:
print("OSA movement in near-field mode.")
if omny_oosaz_current>0:
if omny_oosaz_current > 0:
self._oosa_to_move_corridor()
dev.oosaz.limits = [-0.4, -0.6]
umv(dev.oosaz, -0.45)
umv(dev.oosax, -2)
#todo _omny_interferometer_align_tracking
# todo _omny_interferometer_align_tracking
self.OMNYTools.printgreen("OSA is out.")
def oosa_move_out_of_shield(self):
#todo: _omnycam_samplestage
# todo: _omnycam_samplestage
self._oosa_check_y()
self._oosa_to_move_corridor()
omny_osamx_current = dev.osamx.get().readback
if np.fabs(omny_osamx_current)>0.2:
if np.fabs(omny_osamx_current) > 0.2:
umv(dev.osamx, 0)
omny_oosaz_current = dev.oosaz.get().readback
if omny_oosaz_current > 0.1:
@@ -217,94 +221,84 @@ class OMNYOpticsMixin:
self.OMNYTools.printgreen("OSA is out of shield.")
def ofzp_out(self):
if "rtx" in dev and dev.rtx.enabled:
dev.rtx.controller.feedback_disable()
y_out_pos = self._get_user_param_safe("ofzpy", "out")
if np.fabs(dev.ofzpy.get().readback-y_out_pos)>0.02:
if np.fabs(dev.ofzpy.get().readback - y_out_pos) > 0.02:
umv(dev.ofzpy, y_out_pos)
self.OMNYTools.printgreen("FZP at out position")
def ofzp_in(self):
if "rtx" in dev and dev.rtx.enabled:
dev.rtx.controller.feedback_disable()
x_in_pos = self._get_user_param_safe("ofzpx", "in")
y_in_pos = self._get_user_param_safe("ofzpy", "in")
if np.fabs(dev.ofzpy.get().readback-y_in_pos)>0.02:
if np.fabs(dev.ofzpy.get().readback - y_in_pos) > 0.02:
umv(dev.ofzpy, y_in_pos)
if np.fabs(dev.ofzpx.get().readback-x_in_pos)>0.02:
if np.fabs(dev.ofzpx.get().readback - x_in_pos) > 0.02:
umv(dev.ofzpx, x_in_pos)
self.OMNYTools.printgreen("FZP at in position")
#220 mu FZP at ofzpz 31.8025 for eiger probe (about 2.4 mm propagation after focus)
# 220 mu FZP at ofzpz 31.8025 for eiger probe (about 2.4 mm propagation after focus)
# umv(dev.ofzpy, 0.7944)
# if np.fabs(dev.ofzpx.get().readback+0.4317)>0.05:
# umv(dev.ofzpx, -0.4317)
#note the 220 fzp also works for near field 6.2 kev by just moving back osa and fzp
#ofzpz 24.8 leads to a 9.5 mm propagation distance.
#With the 220 mu FZP this gives 100 nm pixel recons
#for the oosa macro set near_field=1
#170 mu FZP at 6.2 kev for large beam at ofzpz 31.8025 of about 58 mu diameter
#120 mu FZP at ofzpz 28.1991
# note the 220 fzp also works for near field 6.2 kev by just moving back osa and fzp
# ofzpz 24.8 leads to a 9.5 mm propagation distance.
# With the 220 mu FZP this gives 100 nm pixel recons
# for the oosa macro set near_field=1
# 170 mu FZP at 6.2 kev for large beam at ofzpz 31.8025 of about 58 mu diameter
# 120 mu FZP at ofzpz 28.1991
#250 mu FZP 60 nm at 5.65 keV
#ofzpz 29.7 for propagation distance 2.2
#umv ofzpx -0.4457
#umv ofzpy 0.193630
# 250 mu FZP 60 nm at 5.65 keV
# ofzpz 29.7 for propagation distance 2.2
# umv ofzpx -0.4457
# umv ofzpy 0.193630
#150 um fzp, 60 nm, ofzpz 33.8 at 8.9 kev for propagation of 1.7 mm after focus
#umv ofzpx -0.756678
#umv ofzpy 0.193515
# 150 um fzp, 60 nm, ofzpz 33.8 at 8.9 kev for propagation of 1.7 mm after focus
# umv ofzpx -0.756678
# umv ofzpy 0.193515
# 250 um 30 nm FZP upper right
# small abberrations, seems to give good results in weak objects
# ofzpx -0.609240
# umv ofzpy 0.118265
# 250 um 30 nm FZP lower right very aberated
# ofzpx -0.881935
# umv ofzpy 0.537050
#250 um 30 nm FZP upper right
#small abberrations, seems to give good results in weak objects
#ofzpx -0.609240
#umv ofzpy 0.118265
#250 um 30 nm FZP lower right very aberated
#ofzpx -0.881935
#umv ofzpy 0.537050
# ofzpz 28.4027
# 5.30 mm prop at 8.9 keV, 45 nm pixel in near field
#ofzpz 28.4027
#5.30 mm prop at 8.9 keV, 45 nm pixel in near field
# ofzpz 33.103
# 0.6 mm prop at 8.9 kev far field 7 m flight tube at foptz
#ofzpz 33.103
#0.6 mm prop at 8.9 kev far field 7 m flight tube at foptz
#ofzpz 49.4 is reachable just without interferometer swap
#which at 6.2 keV and 250 um diam, 30 nm should gives a propagation of 0.8 after focus
#and a beam size of 6 microns diamter
# ofzpz 49.4 is reachable just without interferometer swap
# which at 6.2 keV and 250 um diam, 30 nm should gives a propagation of 0.8 after focus
# and a beam size of 6 microns diamter
###coordinates 30 nm FZP for comparing them
#not sure if that is really correct
#FZP 1 - FZP 2
# not sure if that is really correct
# FZP 1 - FZP 2
# FZP 5
#FZP 4 - FZP 1
# FZP 4 - FZP 1
#FZP
# FZP
##upper right
#umv ofzpx -0.6154 ofzpy 0.1183
#umv ocsx -0.6070 ocsy 0.0540
#lower right
#umv ofzpx -0.8341 ofzpy 0.5683
#umv ocsx -0.3880 ocsy -0.3960
#lower left
#umv ofzpx -0.3876 ofzpy 0.7902
#umv ocsx -0.8380 ocsy -0.6180
#upper left
#umv ofzpx -0.1678 ofzpy 0.3403
#umv ocsx -1.0550 ocsy -0.1680
# umv ofzpx -0.6154 ofzpy 0.1183
# umv ocsx -0.6070 ocsy 0.0540
# lower right
# umv ofzpx -0.8341 ofzpy 0.5683
# umv ocsx -0.3880 ocsy -0.3960
# lower left
# umv ofzpx -0.3876 ofzpy 0.7902
# umv ocsx -0.8380 ocsy -0.6180
# upper left
# umv ofzpx -0.1678 ofzpy 0.3403
# umv ocsx -1.0550 ocsy -0.1680
def ofzp_info(self, mokev_val=-1, ofzpz_val=-1):
print(f"{ofzpz_val}")
@@ -319,8 +313,10 @@ class OMNYOpticsMixin:
return
if ofzpz_val == -1:
ofzpz_val = dev.ofzpz.readback.get()
distance = 66+2.4+31.8025-ofzpz_val
print(f"\nThe sample is in a distance of \033[1m{distance:.1f} mm\033[0m from the 60 nm FZP.\n")
distance = 66 + 2.4 + 31.8025 - ofzpz_val
print(
f"\nThe sample is in a distance of \033[1m{distance:.1f} mm\033[0m from the 60 nm FZP.\n"
)
print(f"At the current energy of {mokev_val:.4f} keV we have following options:\n")
diameters = [80e-6, 100e-6, 120e-6, 150e-6, 170e-6, 200e-6, 220e-6, 250e-6]
@@ -345,9 +341,11 @@ class OMNYOpticsMixin:
console.print(table)
#30 nm with additional spacer
distance = 53.84+0.6+33.1-ofzpz_val
print(f"\nThe sample is in a distance of \033[1m{distance:.1f} mm\033[0m from the 30 nm FZP.\n")
# 30 nm with additional spacer
distance = 53.84 + 0.6 + 33.1 - ofzpz_val
print(
f"\nThe sample is in a distance of \033[1m{distance:.1f} mm\033[0m from the 30 nm FZP.\n"
)
diameters = [150e-6, 250e-6]
@@ -372,8 +370,8 @@ class OMNYOpticsMixin:
console.print(table)
print(
"This function can be called with explicit energy and ofzpz position.\n Example: omny.ffzp_info(mokev_val=6.2, ofzpz_val=33.2)"
)
"This function can be called with explicit energy and ofzpz position.\n Example: omny.ffzp_info(mokev_val=6.2, ofzpz_val=33.2)"
)
# from flomni
# oosaz_val = dev.oosaz.readback.get()

View File

@@ -13,22 +13,26 @@ from rich.table import Table
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
class OMNY_rt_clientError(Exception):
pass
class OMNY_rt_client:
def __init__(self):
self.mirror_channel = -1
self.mirror_amplitutde_increase=0
self.mirror_amplitutde_increase = 0
self.mirror_parameters = {}
for j in range(1,9):
for j in range(1, 9):
self.mirror_parameters[j] = dev.rtx.controller.get_mirror_parameters(j)
@staticmethod
def _get_user_param_safe(device, var):
param = dev[device].user_parameter
if not param or param.get(var) is None:
raise OMNY_rt_clientError(f"Device {device} has no user parameter definition for {var}.")
raise OMNY_rt_clientError(
f"Device {device} has no user parameter definition for {var}."
)
return param.get(var)
def _omny_interferometer_openloop_steps(self, channel, steps, amplitude):
@@ -38,7 +42,7 @@ class OMNY_rt_client:
self._tweak_interferometer()
def _tweak_interferometer(self):
self.mirror_channel=-1
self.mirror_channel = -1
# Save the current terminal settings
fd = sys.stdin.fileno()
@@ -60,71 +64,123 @@ class OMNY_rt_client:
try:
# Read single character input
key = sys.stdin.read(1)
if key == 'q':
self.mirror_amplitutde_increase=0
self.mirror_channel=-1
if key == "q":
self.mirror_amplitutde_increase = 0
self.mirror_channel = -1
print("\n\rExiting tweak mode\r")
break
elif key == '\x1b': # Escape sequences for arrow keys
elif key == "\x1b": # Escape sequences for arrow keys
next1, next2 = sys.stdin.read(2)
if next1 == '[':
printit=True
if next2 == 'A':
#print("up")
if next1 == "[":
printit = True
if next2 == "A":
# print("up")
if self.mirror_channel != -1:
self._omny_interferometer_openloop_steps(4, -self.mirror_parameters[self.mirror_channel]["opt_steps2_neg"], self.mirror_parameters[self.mirror_channel]["opt_amplitude2_neg"]+self.mirror_amplitutde_increase)
elif next2 == 'B':
#print(" down")
self._omny_interferometer_openloop_steps(
4,
-self.mirror_parameters[self.mirror_channel][
"opt_steps2_neg"
],
self.mirror_parameters[self.mirror_channel][
"opt_amplitude2_neg"
]
+ self.mirror_amplitutde_increase,
)
elif next2 == "B":
# print(" down")
if self.mirror_channel != -1:
self._omny_interferometer_openloop_steps(4, self.mirror_parameters[self.mirror_channel]["opt_steps2_pos"], self.mirror_parameters[self.mirror_channel]["opt_amplitude2_pos"]+self.mirror_amplitutde_increase)
elif next2 == 'C':
#print("right")
self._omny_interferometer_openloop_steps(
4,
self.mirror_parameters[self.mirror_channel][
"opt_steps2_pos"
],
self.mirror_parameters[self.mirror_channel][
"opt_amplitude2_pos"
]
+ self.mirror_amplitutde_increase,
)
elif next2 == "C":
# print("right")
if self.mirror_channel != -1:
self._omny_interferometer_openloop_steps(3, -self.mirror_parameters[self.mirror_channel]["opt_steps1_neg"], self.mirror_parameters[self.mirror_channel]["opt_amplitude1_neg"]+self.mirror_amplitutde_increase)
elif next2 == 'D':
#print("left")
self._omny_interferometer_openloop_steps(
3,
-self.mirror_parameters[self.mirror_channel][
"opt_steps1_neg"
],
self.mirror_parameters[self.mirror_channel][
"opt_amplitude1_neg"
]
+ self.mirror_amplitutde_increase,
)
elif next2 == "D":
# print("left")
if self.mirror_channel != -1:
self._omny_interferometer_openloop_steps(3, self.mirror_parameters[self.mirror_channel]["opt_steps1_pos"], self.mirror_parameters[self.mirror_channel]["opt_amplitude1_pos"]+self.mirror_amplitutde_increase)
self._omny_interferometer_openloop_steps(
3,
self.mirror_parameters[self.mirror_channel][
"opt_steps1_pos"
],
self.mirror_parameters[self.mirror_channel][
"opt_amplitude1_pos"
]
+ self.mirror_amplitutde_increase,
)
elif key.isdigit() and 1 <= int(key) <= 8:
self.mirror_channel = int(key)
opt_mirrorname = self.mirror_parameters[self.mirror_channel]["opt_mirrorname"]
autostop = self.mirror_parameters[self.mirror_channel]['opt_signal_stop']
averaging_time = self.mirror_parameters[self.mirror_channel]["opt_averaging_time"]
print(f"\nSelected mirror channel {self.mirror_channel}: {opt_mirrorname}. Autostop {autostop}. Signal averaging time: {averaging_time}\r")
opt_mirrorname = self.mirror_parameters[self.mirror_channel][
"opt_mirrorname"
]
autostop = self.mirror_parameters[self.mirror_channel]["opt_signal_stop"]
averaging_time = self.mirror_parameters[self.mirror_channel][
"opt_averaging_time"
]
print(
f"\nSelected mirror channel {self.mirror_channel}: {opt_mirrorname}. Autostop {autostop}. Signal averaging time: {averaging_time}\r"
)
if int(key) == 6:
dev.rtx.controller.laser_tracker_on()
dev.rtx.controller._omny_interferometer_switch_channel(self.mirror_channel)
max=0
printit=True
elif key == '+':
max = 0
printit = True
elif key == "+":
print("\nIncreasing voltage amplitudes by 100.\r")
self.mirror_amplitutde_increase+=100
elif key == '-':
self.mirror_amplitutde_increase += 100
elif key == "-":
print("\nDecreasing voltage amplitudes by 100.\r")
self.mirror_amplitutde_increase-=100
elif key == 'a':
self.mirror_amplitutde_increase -= 100
elif key == "a":
if self.mirror_channel != -1:
dev.rtx.controller._omny_interferometer_optimize(mirror_channel = self.mirror_channel, channel=3)
dev.rtx.controller._omny_interferometer_optimize(mirror_channel = self.mirror_channel, channel=4)
dev.rtx.controller._omny_interferometer_optimize(mirror_channel = self.mirror_channel, channel=3)
dev.rtx.controller._omny_interferometer_optimize(mirror_channel = self.mirror_channel, channel=4)
dev.rtx.controller._omny_interferometer_optimize(
mirror_channel=self.mirror_channel, channel=3
)
dev.rtx.controller._omny_interferometer_optimize(
mirror_channel=self.mirror_channel, channel=4
)
dev.rtx.controller._omny_interferometer_optimize(
mirror_channel=self.mirror_channel, channel=3
)
dev.rtx.controller._omny_interferometer_optimize(
mirror_channel=self.mirror_channel, channel=4
)
if self.mirror_channel != -1 and printit:
printit = False
signal = dev.rtx.controller._omny_interferometer_get_signalsample(self.mirror_parameters[self.mirror_channel]["opt_signalchannel"], self.mirror_parameters[self.mirror_channel]["opt_averaging_time"])
signal = dev.rtx.controller._omny_interferometer_get_signalsample(
self.mirror_parameters[self.mirror_channel]["opt_signalchannel"],
self.mirror_parameters[self.mirror_channel]["opt_averaging_time"],
)
if signal > max:
max = signal
info_str = f"Channel {self.mirror_channel}, {opt_mirrorname}, Current signal: {signal:.0f}"
filling = " " * (50-len(info_str))
filling = " " * (50 - len(info_str))
# Calculate the number of filled and unfilled segments
length = 30
percentage = signal / max
filled_length = int(length * percentage)
unfilled_length = length - filled_length
bar = '#' * filled_length + '-' * unfilled_length
print(info_str + filling + "0 " + bar + f" {max:.0f} (q)uit\r", end='')
bar = "#" * filled_length + "-" * unfilled_length
print(info_str + filling + "0 " + bar + f" {max:.0f} (q)uit\r", end="")
except IOError:
# No input available, keep looping
@@ -147,9 +203,15 @@ class OMNY_rt_client:
def omny_interferometer_align_incoupling_angle(self):
dev.rtx.controller.omny_interferometer_align_incoupling_angle()
def interferometer_tweak_otrack(self):
self.OMNYTools.tweak_cursor(dev.otrackz,.1,dev.otracky,.1,special_command=dev.rtx.controller.laser_tracker_print_intensity_for_otrack_tweaking)
self.OMNYTools.tweak_cursor(
dev.otrackz,
0.1,
dev.otracky,
0.1,
special_command=dev.rtx.controller.laser_tracker_print_intensity_for_otrack_tweaking,
)
def feedback_enable_with_reset(self):
dev.rtx.controller.feedback_enable_with_reset()
@@ -177,4 +239,3 @@ class OMNY_rt_client:
def laser_tracker_check_and_wait_for_signalstrength(self):
dev.rtx.controller.laser_tracker_check_and_wait_for_signalstrength()

View File

@@ -22,7 +22,7 @@ if TYPE_CHECKING:
class XrayEyeAlign:
# pixel calibration, multiply to get mm
PIXEL_CALIBRATION = 0.2/218 # .2 with binning
PIXEL_CALIBRATION = 0.2 / 218 # .2 with binning
def __init__(self, client, omny: OMNY) -> None:
self.client = client