2745 lines
97 KiB
Python
Executable File
2745 lines
97 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# *-----------------------------------------------------------------------*
|
|
# | |
|
|
# | Copyright (c) 2022 by Paul Scherrer Institute (http://www.psi.ch) |
|
|
# | Based on Zac great first implementation |
|
|
# | Author Thierry Zamofing (thierry.zamofing@psi.ch) |
|
|
# *-----------------------------------------------------------------------*
|
|
"""
|
|
SwissMx experiment application.
|
|
Based on Zac great first implementation
|
|
|
|
|
|
bitmask for simulation:
|
|
0x001: backlight
|
|
0x002: illumination
|
|
0x004: zoom
|
|
0x008: camera
|
|
0x010: EPICS motors Deltatau
|
|
0x020: EPICS motors SmarAct
|
|
0x040: shutter
|
|
0x080: Jungfrau
|
|
0x100: Deltatau motion code
|
|
|
|
"""
|
|
import logging,time,os,sys,socket
|
|
class col:
|
|
d='\033[0m' # default
|
|
r='\033[31m' # red
|
|
g='\033[32m' # green
|
|
y='\033[33m' # yellow
|
|
rr='\033[91m' # red(bright)
|
|
gg='\033[92m' # green(bright)
|
|
yy='\033[93m' # yellow(bright)
|
|
b='\033[1m' # bold
|
|
u='\033[4m' # underline
|
|
|
|
def listLoggers():
|
|
rootlogger=logging.getLogger()
|
|
print(rootlogger)
|
|
for h in rootlogger.handlers:
|
|
print(' %s'%h)
|
|
|
|
for nm, lgr in logging.Logger.manager.loggerDict.items():
|
|
print('+ [%-20s] %s '%(nm, lgr))
|
|
if not isinstance(lgr, logging.PlaceHolder):
|
|
for h in lgr.handlers:
|
|
print(' %s'%h)
|
|
|
|
class logHandler(logging.StreamHandler):
|
|
def __init__(self):
|
|
logging.StreamHandler.__init__(self)
|
|
|
|
def emit(self, record):
|
|
'''override function of base class'''
|
|
try:
|
|
msg=self.format(record)
|
|
# print(record.__dict__)
|
|
if record.levelno<=10:
|
|
c=col.g
|
|
elif record.levelno<=20:
|
|
c=col.y
|
|
elif record.levelno<=30:
|
|
c=col.yy
|
|
elif record.levelno<=40:
|
|
c=col.r
|
|
else:
|
|
c=col.rr+col.b
|
|
msg=c+msg+col.d
|
|
stream=self.stream
|
|
stream.write(msg+self.terminator)
|
|
self.flush()
|
|
except RecursionError:
|
|
raise
|
|
except Exception:
|
|
self.handleError(record)
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(levelname)s:%(module)s:%(lineno)d:%(funcName)s:%(message)s',
|
|
handlers=[logHandler()])
|
|
|
|
_log = logging.getLogger("swissmx")
|
|
|
|
if __name__=="__main__":
|
|
if socket.gethostname()=='ganymede':
|
|
base=os.path.abspath(os.path.dirname(__file__))
|
|
sys.path.insert(0, os.path.abspath(os.path.join(base,'../PBSwissMX/python')))
|
|
sys.path.insert(0, os.path.abspath(os.path.join(base,'../../PBTools')))
|
|
else:
|
|
base=os.path.abspath(os.path.dirname(__file__))
|
|
sys.path.insert(0, os.path.join(base,'PBTools'))
|
|
sys.path.insert(0, os.path.join(base,'PBSwissMX/python'))
|
|
|
|
|
|
class timestamp():
|
|
def __init__(self):
|
|
self.t=time.time()
|
|
def log(self,txt):
|
|
t=time.time()
|
|
print(txt+f'{t-self.t:6.3g}')
|
|
self.t=t
|
|
ts=timestamp()
|
|
ts.log('Import part 1/8:')
|
|
import sys, os
|
|
import yaml
|
|
import signal, subprocess
|
|
import matplotlib as mpl
|
|
import matplotlib.pyplot as plt
|
|
|
|
mpl.use('Qt5Agg') # needed to avoid blocking of ui !
|
|
|
|
TASK_JUNGFRAU_SETTINGS = "jungfrau_settings"
|
|
TASK_SETUP_GEOMETRY_CALIB = "geometry_calib"
|
|
TASK_SETUP_PPM_CALIBRATION_TBOX = "ppm_calibration_tbox"
|
|
TASK_SETUP_BEAM_CENTER = "beamcenter_calibration"
|
|
TASK_SETUP_CAMERA = "setup_camera"
|
|
TASK_SETUP_ROI = "setup_rois"
|
|
TASK_SAMPLE_SELECTION = "task_sample_selection"
|
|
TASK_SCREENING = "screening"
|
|
TASK_FIX_TARGET = "fix_trg"
|
|
TASK_GRID = "grid"
|
|
TASK_PRELOCATED = "prelocated"
|
|
TASK_HELICAL = "helical"
|
|
TASK_EMBL = "embl"
|
|
ts.log('Import part 2/8:')
|
|
import ModuleFixTarget
|
|
|
|
ts.log('Import part 3/8:')
|
|
import qtawesome
|
|
import qutilities
|
|
from PyQt5 import QtCore, QtGui
|
|
from PyQt5.QtCore import Qt, pyqtSlot, QSize, pyqtSignal, QObject, QRectF,QT_VERSION_STR
|
|
from PyQt5.QtGui import QKeySequence, QPixmap
|
|
from PyQt5.QtWidgets import (
|
|
QAction, QApplication, QFileDialog, QGridLayout, QHBoxLayout, QLabel, QLineEdit,
|
|
QMessageBox, QProgressBar, QPushButton, QShortcut, QSizePolicy,
|
|
QSplashScreen, QToolBox, QVBoxLayout, QWidget,)
|
|
from PyQt5.uic import loadUiType
|
|
ts.log('Import part 4/8:')
|
|
import pyqtUsrObj as UsrGO
|
|
from epics_widgets.MotorTweak import MotorTweak
|
|
from epics_widgets.SimMotorTweak import SimMotorTweak
|
|
ts.log('Import part 5/8:')
|
|
import numpy as np
|
|
np.set_printoptions(suppress=True,linewidth=196)
|
|
import pyqtgraph as pg
|
|
import pyqtgraph.exporters
|
|
# use antialias for draw lines, interpret image data as row-major instead of col-major
|
|
pg.setConfigOptions(antialias=True,imageAxisOrder='row-major')
|
|
ts.log('Import part 6/8:')
|
|
import app_utils
|
|
from app_config import AppCfg,WndParameter #settings, option, toggle_option
|
|
|
|
import epics
|
|
import camera,backlight,zoom,illumination,geometry
|
|
ts.log('Import part 7/8:')
|
|
import psi_device
|
|
ts.log('Import part 8/8:')
|
|
|
|
def tdstamp():
|
|
return time.strftime("%Y%m%dH%H%M%S")
|
|
|
|
def datestamp():
|
|
return time.strftime("%Y%m%d")
|
|
|
|
def get_version(path='.'):
|
|
#sys.stdout.write('getVersion() -> using git command -> ')
|
|
p = subprocess.Popen(f'git -C {path} describe --match ''*.*.*'' --long --tags --dirty', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
retval = p.wait()
|
|
res=p.stdout.readline()
|
|
p.stdout.close()
|
|
#res=res.decode()[1:-1].split('-',2)
|
|
res=res.decode()[:-1].split('-',2)
|
|
ver='.'.join(res[:2])
|
|
gitcmt=res[2][1:]
|
|
return (ver,gitcmt)
|
|
|
|
def sigint_handler(*args):
|
|
"""Handler for the SIGINT signal."""
|
|
app=QApplication.instance()
|
|
app.quit()
|
|
|
|
class AcquisitionAbortedException(Exception):
|
|
pass
|
|
|
|
class Sequencer(QObject):
|
|
finished = pyqtSignal()
|
|
timeoutExpired = pyqtSignal()
|
|
errorHappened = pyqtSignal(str)
|
|
|
|
def __init__(self, steps):
|
|
super().__init__()
|
|
self.steps = steps
|
|
|
|
def run_sequence(self):
|
|
num_steps = len(self.steps)
|
|
for n, step in enumerate(self.steps):
|
|
_log.info("runing step {}/{}".format(n, num_steps))
|
|
step()
|
|
_log.info("emiting finished")
|
|
self.finished.emit()
|
|
|
|
|
|
class StartupSplash:
|
|
|
|
def __init__(self):
|
|
splash_pix = QPixmap("artwork/logo/256x256.png")
|
|
self._wnd=splash = QSplashScreen(splash_pix, Qt.WindowStaysOnTopHint)
|
|
splash.setWindowFlags(Qt.WindowStaysOnTopHint | Qt.FramelessWindowHint)
|
|
splash.setEnabled(False)
|
|
|
|
self._prgsBar=prgs=QProgressBar(splash)
|
|
prgs.setMaximum(100)
|
|
prgs.setGeometry(0, splash_pix.height() - 50, splash_pix.width(), 20)
|
|
|
|
splash.show()
|
|
|
|
def set(self,i,msg):
|
|
self._prgsBar.setValue(i)
|
|
self._wnd.showMessage(f"<font color='red'>{msg}</font></h1>", int(Qt.AlignBottom|Qt.AlignCenter), Qt.black)
|
|
app=QApplication.instance()
|
|
app.processEvents()
|
|
time.sleep(.1)
|
|
|
|
Ui_MainWindow, QMainWindow = loadUiType(os.path.join(os.path.dirname(__file__),"swissmx.ui"))
|
|
|
|
class WndSwissMx(QMainWindow, Ui_MainWindow):
|
|
sigNewCamImg = pyqtSignal() # index in self._grids
|
|
|
|
#pixelsPerMillimeter = pyqtSignal(float)
|
|
#beamCameraCoordinatesChanged = pyqtSignal(float, float)
|
|
addGridRequest = pyqtSignal(float, float)
|
|
#zoomChanged = pyqtSignal(float)
|
|
#folderChanged = pyqtSignal(str)
|
|
prefixChanged = pyqtSignal(str)
|
|
projectChanged = pyqtSignal(str)
|
|
gridUpdated = pyqtSignal(int) # index in self._grids
|
|
gonioMoveRequest = pyqtSignal(float, float, float, float, float)
|
|
#fast_x_position = pyqtSignal(float)
|
|
#fast_y_position = pyqtSignal(float)
|
|
#fast_dx_position = pyqtSignal(float)
|
|
#fast_dy_position = pyqtSignal(float)
|
|
fiducialPositionSelected = pyqtSignal(float, float, float, float) # camx, camy, gonx, gony
|
|
appendPrelocatedPosition = pyqtSignal(float, float, float, float)
|
|
appendPrelocatedFiducial = pyqtSignal(bool, float, float, float, float)
|
|
increaseRunNumberRequest = pyqtSignal()
|
|
daqAborted = pyqtSignal()
|
|
|
|
def __init__(self,):
|
|
super(WndSwissMx, self).__init__()
|
|
self.setupUi(self)
|
|
|
|
app=QApplication.instance()
|
|
|
|
if os.getenv("DEVELOPMENT_VERSION"):
|
|
title = "[DEVELOPMENT] SwissMX - Solid support scanning for FEL sources"
|
|
else:
|
|
title = "SwissMX - Solid support scanning for FEL sources"
|
|
self.setWindowTitle(title)
|
|
|
|
self._do_quit = False
|
|
path=os.path.join(os.path.dirname(__file__),"fonts")
|
|
qtawesome.load_font("material", "MaterialIcons-Regular.ttf", "MaterialIcons-Regular.json", path)
|
|
|
|
QtGui.QFontDatabase.addApplicationFont("fonts/Inconsolata-Bold.ttf")
|
|
QtGui.QFontDatabase.addApplicationFont("fonts/Baloo-Regular.ttf")
|
|
|
|
self.init_settings()
|
|
|
|
# self.create_escape_toolbar()
|
|
self.tweakers = {}
|
|
self.setup_sliders()
|
|
|
|
self.init_graphics()
|
|
|
|
self._esc_state ="Maintenance"
|
|
self._pin_mounting_offset = 0.0
|
|
|
|
self._mouse_tracking = False
|
|
self._rois = []
|
|
|
|
# add a generic ellipse
|
|
# pen = QtGui.QPen(QColor(Qt.yellow))
|
|
# pen.setWidth(5)
|
|
# ellipse1 = QtWidgets.QGraphicsEllipseItem(650, 700, 500, 300)
|
|
# ellipse1.setFlag(QGraphicsItem.ItemIsSelectable)
|
|
# ellipse1.setFlag(QGraphicsItem.ItemIsMovable)
|
|
# ellipse1.setPen(pen)
|
|
# self.vb.addItem(ellipse1)
|
|
|
|
# self.vb.addItem(Grid(pos=(650, 700), size=(100, 200), cell_size=(3, 4), parent=self))
|
|
# self.vb.addItem(Retangulo(pos=(750, 700), size=(100, 200), pen=pg.fn.mkPen(color=[230, 150, 200], width=4.7)))
|
|
# self.vb.addItem(pg.PolyLineROI([[80, 60], [90, 30], [60, 40]], pen=(6,9), closed=True))
|
|
|
|
# test
|
|
# x2 = np.linspace(-100, 100, 1000)
|
|
# data2 = np.sin(x2) / x2
|
|
# # p8 = self.glw.addPlot(row=1, col=0, title="Region Selection")
|
|
# plitem = pg.PlotItem()
|
|
# plitem.plot(data2, pen=(255, 255, 255, 200))
|
|
# # lr = pg.LinearRegionItem([400, 700])
|
|
# # lr.setZValue(-10)
|
|
# self.vb.addItem(plitem)
|
|
|
|
self._message_critical_fault = QLabel(None)
|
|
self._message_critical_fault.setAccessibleName("device_fault")
|
|
self.statusbar.insertWidget(0, self._message_critical_fault)
|
|
self._lb_coords=lbl=QLabel(None)
|
|
#f=QFont('monospace', 10)
|
|
#lbl.setFont(f)
|
|
self.statusbar.addPermanentWidget(lbl)
|
|
|
|
self._fel_status = QLabel(None)
|
|
self._fel_status.setAccessibleName("fel_status_statusbar")
|
|
self.statusbar.addPermanentWidget(self._fel_status)
|
|
#swissfel.statusStringUpdated.connect(self._fel_status.setText) #ZAC: orig. code
|
|
self._status_task = QLabel(None)
|
|
self._status_task.setAccessibleName("status_task_label")
|
|
self.statusbar.addPermanentWidget(self._status_task)
|
|
|
|
#self.add_beam_marker()
|
|
#self._beam_markers = {}
|
|
|
|
# ppm calibration
|
|
#self._zoom_to_ppm = {}
|
|
#self._ppm_click = None
|
|
|
|
self.load_stylesheet()
|
|
|
|
#self.folderChanged.connect(lambda t: print("folder now: {}".format(t))) # FIXME
|
|
|
|
self.init_actions()
|
|
#self.prepare_microscope_page()
|
|
self.prepare_wd_tabs_left()
|
|
#self.update_beam_marker(qoptic_zoom.get_sp()) #ZAC: orig. code
|
|
self._wd_stack.setCurrentIndex(0)
|
|
self._wd_stack.currentChanged.connect(self.cb_update_center_widget)
|
|
#self._OLD_init_validators()
|
|
#self.init_settings_tracker() ? not needed, was for TELL ?
|
|
#self._OLD_wire_storage()
|
|
|
|
self.cb_update_center_widget(0) # start camera updater
|
|
curzoom = app._zoom.get_val()
|
|
_log.debug(f"starting app with zoom at {curzoom}")
|
|
self.cb_zoom_changed(curzoom)
|
|
self._tabs_daq_methods.currentChanged.connect(self.cb_switch_task)
|
|
self.cb_switch_task()
|
|
|
|
def init_settings(self):
|
|
app = QApplication.instance()
|
|
cfg = app._cfg
|
|
keys = cfg.allKeys()
|
|
|
|
#if EXPERIMENT_PGROUP not in keys:
|
|
# self.update_user_and_storage()
|
|
|
|
if cfg.WINDOW_GEOMETRY in keys:
|
|
self.restoreGeometry(cfg.value(cfg.WINDOW_GEOMETRY, ""))
|
|
self.restoreState(cfg.value(cfg.WINDOW_STATE, ""))
|
|
|
|
if cfg.WINDOW_SPLITTER in keys:
|
|
sz = [int(s) for s in cfg.value(cfg.WINDOW_SPLITTER)]
|
|
else:
|
|
sz=(500, 1200)
|
|
self._wd_splitter.setSizes(sz)
|
|
|
|
#if AppCfg.BEAM_MARKER_POSITIONS in keys:
|
|
# self._beam_markers = s.value(AppCfg.BEAM_MARKER_POSITIONS)
|
|
# self.update_beam_marker_fitters()
|
|
# _log.info("read beam markers {}".format(self._beam_markers))
|
|
|
|
#if AppCfg.CAMERA_TRANSFORMATIONS in keys:
|
|
# app._camera.set_transformations(s.value(AppCfg.CAMERA_TRANSFORMATIONS))
|
|
|
|
#if AppCfg.CAMERA_ZOOM_TO_PPM in keys:
|
|
# self._zoom_to_ppm = s.value(AppCfg.CAMERA_ZOOM_TO_PPM)
|
|
# _log.info(f"{AppCfg.CAMERA_ZOOM_TO_PPM} updated: {self._zoom_to_ppm}")
|
|
# self.update_ppm_fitters()
|
|
|
|
def init_graphics(self):
|
|
app = QApplication.instance()
|
|
cfg = app._cfg
|
|
geo = app._geometry
|
|
self.glw = pg.GraphicsLayoutWidget()
|
|
self.microscope_page.setLayout(QVBoxLayout())
|
|
self.microscope_page.layout().addWidget(self.glw)
|
|
self.glw.show()
|
|
self.glw.scene().sigMouseMoved.connect(self.cb_mouse_move)
|
|
self.glw.scene().sigMouseClicked.connect(self.cb_mouse_click)
|
|
|
|
#--- viewbox ---
|
|
self.vb=vb=self.glw.addViewBox(invertY=False,border='r',enableMenu=True)
|
|
#TODO: vb.enableAutoRange(enable=True), vb.autoRange() does not work for ItemGroups
|
|
#therefore set the vieweRange manually
|
|
pad=10
|
|
vb.setRange(QRectF(-1200-pad,-1000-pad,1200+2*pad,1000+2*pad))
|
|
|
|
vb.setAspectLocked(True)
|
|
vb.setBackgroundColor((120, 90, 90))
|
|
|
|
tr=QtGui.QTransform() # prepare ImageItem transformation:
|
|
opt_ctr=app._geometry._opt_ctr
|
|
|
|
#--- image group ---
|
|
# uses image transformation
|
|
# contains image and opticalcenter
|
|
self._goImgGrp=grp=pg.ItemGroup()
|
|
self.vb.addItem(grp)
|
|
trf=cfg.value(AppCfg.GEO_CAM_TRF)
|
|
# be aware: setTransform is transposed!
|
|
# Qt uses: p'=(p.T*A.T).T , but I am used: p'=A*p, with p=[x,y,1].T
|
|
tr.setMatrix(trf[0,0],trf[1,0],trf[2,0], #(-1, 0, 0,
|
|
trf[0,1],trf[1,1],trf[2,1], # 0,-1, 0,
|
|
trf[0,2],trf[1,2],trf[2,2]) # 0, 0, 1)
|
|
grp.setTransform(tr) # assign transform
|
|
|
|
#--- image ---
|
|
self._goImg=img=pg.ImageItem() #border=pg.mkPen('r',width=2))
|
|
grp.addItem(img)
|
|
|
|
#--- opctical center ----
|
|
oc_sz=np.array((50,50))
|
|
#self._goOptCtr=obj=UsrGO.Marker(-opt_ctr+oc_sz/2, oc_sz,mode=1)
|
|
self._goOptCtr=obj=UsrGO.Marker(opt_ctr-oc_sz/2, oc_sz,mode=1, movable=False)
|
|
obj.sigRegionChangeFinished.connect(self.cb_marker_moved)
|
|
grp.addItem(obj)
|
|
|
|
#--- grid ---
|
|
try:
|
|
self._goGrid=grid=pg.GridItem(pen=(0,255,0),textPen=(0,255,0)) #green grid and labels
|
|
except NameError:
|
|
_log.debug('workaround for typo in pyqtgraph:0.11.0')
|
|
from PyQt5.QtGui import QPen,QColor
|
|
self._goGrid=grid=pg.GridItem() # green grid and labels
|
|
grid.opts['pen']=QPen(QColor(0, 255, 0))
|
|
grid.opts['textPen']=QPen(QColor(0, 255, 0))
|
|
#tr.reset()
|
|
#grid.setTransform(tr) # assign transform
|
|
vb.addItem(grid)
|
|
|
|
#--- fixed group ---
|
|
# uses pix2pos transformation with a fixed fx,fy value =(0,0)
|
|
# contains beam marker
|
|
self._goFixGrp=grp=pg.ItemGroup()
|
|
|
|
geo.interp_zoom(1)
|
|
pix2pos=geo._pix2pos
|
|
A=np.asarray(pix2pos.I)
|
|
tr=grp.transform()
|
|
p1=np.hstack((opt_ctr,1)) #position of optical center on image item
|
|
p2=np.matmul(trf, p1) #position of optical center on viewbox
|
|
tr.setMatrix(A[0,0], A[0,1], 0,
|
|
A[1,0], A[1,1], 0,
|
|
p2[0], p2[1], 1) # translate dx,dy
|
|
grp.setTransform(tr)
|
|
self.vb.addItem(grp)
|
|
|
|
#--- beam marker ---
|
|
size_eu=cfg.value(AppCfg.GEO_BEAM_SZ)/1000 # convert from um to mm
|
|
pos_eu=cfg.value(AppCfg.GEO_BEAM_POS)/1000 # convert from um to mm
|
|
self._goBeamMarker=obj=UsrGO.Marker(pos_eu-size_eu/2,size_eu,mode=0,movable=False)
|
|
obj.sigRegionChangeFinished.connect(self.cb_marker_moved)
|
|
#bm.setTransform(tr) # assign transform
|
|
grp.addItem(obj)
|
|
|
|
|
|
#--- testing scan grid ---
|
|
#self.track_objects() # first call is needed to initialize the structure self._goTracked
|
|
# #go=UsrGO.Grid((120, -100), (200, 150), (30, 22), 2)
|
|
# go=UsrGO.Grid((120, -100), (1000, 500), (10, 5), 2)
|
|
# go.setTransform(tr) # assign transform
|
|
# vb.addItem(go)
|
|
# self._goTracked['objLst'].append(go)
|
|
# self.track_objects() #tracking now the objects
|
|
# #UsrGO.obj_tree(vb)
|
|
|
|
def init_actions(self):
|
|
app = QApplication.instance()
|
|
cfg = app._cfg
|
|
self.toolBar.setToolButtonStyle(Qt.ToolButtonTextUnderIcon)
|
|
|
|
self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.SHIFT + Qt.Key_S), self)
|
|
self.shortcut.activated.connect(self.cb_save_cam_image)
|
|
|
|
self.shortcut = QShortcut(QKeySequence(Qt.Key_F2), self)
|
|
self.shortcut.activated.connect(self.cb_save_cam_image)
|
|
|
|
self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.Key_S), self)
|
|
self.shortcut.activated.connect(self.cb_save_cam_image)
|
|
|
|
for k in range(10):
|
|
qkey = k + Qt.Key_0
|
|
self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.SHIFT + qkey), self)
|
|
self.shortcut.activated.connect(lambda key=k: self.saveBookmark(key))
|
|
|
|
self.shortcut = QShortcut(QKeySequence(Qt.CTRL + qkey), self)
|
|
self.shortcut.activated.connect(lambda key=k: self.gotoBookmark(key))
|
|
|
|
self._button_collect.clicked.connect(self.cb_execute_collection)
|
|
|
|
# Toolbar buttons
|
|
icon_size = QSize(50, 50)
|
|
|
|
icon = qtawesome.icon("material.photo_camera")
|
|
action = QAction(icon, "Save Original", self)
|
|
action.setToolTip("(Ctrl+Shift+S) Take a screenshot of the sample image, without markers. Saves in current folder.")
|
|
action.triggered.connect(self.cb_save_cam_image)
|
|
self.toolBar.addAction(action)
|
|
|
|
action = QAction(icon, "Save View", self)
|
|
action.setToolTip("(Ctrl+S) Take a screenshot of the currently visible sample image, including markers. Saves in current folder.")
|
|
action.triggered.connect(lambda: self.cb_save_cam_image(True))
|
|
self.toolBar.addAction(action)
|
|
|
|
if os.getenv("DEVELOPMENT_VERSION"):
|
|
icon = qtawesome.icon("material.style")
|
|
action = QAction(icon, "Reload Stylesheet", self)
|
|
action.triggered.connect(self.load_stylesheet)
|
|
self.toolBar.addAction(action)
|
|
|
|
# icon = qtawesome.icon("material.shutter_speed")
|
|
#action = QAction("Use Pulse Picker", self)
|
|
#action.setCheckable(True)
|
|
#action.setChecked(cfg.option(AppCfg.ACTIVATE_PULSE_PICKER))
|
|
#action.triggered.connect(lambda: cfg.toggle_option(AppCfg.ACTIVATE_PULSE_PICKER))
|
|
#self.toolBar.addAction(action)
|
|
|
|
#icon = qtawesome.icon("material.timeline")
|
|
#action = QAction(icon, "Add Line", self)
|
|
#action.triggered.connect(self._OLD_roi_add_line)
|
|
#self.toolBar.addAction(action)
|
|
|
|
#icon = qtawesome.icon("material.tab_unselected")
|
|
#action = QAction(icon, "Add Rect", self)
|
|
#action.triggered.connect(self._OLD_roi_add_rect)
|
|
#self.toolBar.addAction(action)
|
|
|
|
#icon = qtawesome.icon("material.play_for_work")
|
|
#action = QAction(icon, "TELL Mount", self)
|
|
#action.setToolTip("BookMark(0) => Move stages CZ, CX, FX, FY, and Omega to Tell sample changer mount position")
|
|
#action.triggered.connect(self._OLD_escape_goToTellMountPosition)
|
|
#self.toolBar.addAction(action)
|
|
|
|
icon = qtawesome.icon("material.play_for_work")
|
|
action = QAction(icon, "Auto\nFocus", self)
|
|
action.triggered.connect(self.cb_autofocus)
|
|
self.toolBar.addAction(action)
|
|
|
|
action = QAction(icon, "Find\nFiducial", self)
|
|
action.triggered.connect(self.cb_find_fiducial)
|
|
self.toolBar.addAction(action)
|
|
|
|
action = QAction(icon, "Test\nCode", self)
|
|
action.triggered.connect(self.cb_testcode)
|
|
self.toolBar.addAction(action)
|
|
|
|
self.toolBar.addWidget(qutilities.horiz_spacer())
|
|
|
|
icon = qtawesome.icon("material.sync")
|
|
action = QAction(icon, "Sample\nExchange", self)
|
|
action.setToolTip("Move devices so a sample can be exchanged.")
|
|
action.setObjectName("action_SampleExchange")
|
|
action.triggered.connect(self.cb_esc_sample_exchange)
|
|
self.toolBar.addAction(action)
|
|
self.toolBar.widgetForAction(action).setAccessibleName("action_SampleExchange")
|
|
|
|
icon = qtawesome.icon("material.my_location")
|
|
action = QAction(icon, "Sample\nAlignment", self)
|
|
action.setToolTip("Move devices so a sample can be aligned.")
|
|
action.triggered.connect(self.cb_esc_sample_alignment)
|
|
self.toolBar.addAction(action)
|
|
self.toolBar.widgetForAction(action).setAccessibleName("action_SampleAlignment")
|
|
|
|
icon = qtawesome.icon("material.fingerprint")
|
|
action = QAction(icon, "Data\nCollection", self)
|
|
action.setToolTip("Move devices so a sample can be collected.")
|
|
action.triggered.connect(self.cb_esc_data_collection)
|
|
self.toolBar.addAction(action)
|
|
self.toolBar.widgetForAction(action).setAccessibleName("action_DataCollection")
|
|
|
|
self.actionQuit.triggered.connect(self.cb_really_quit)
|
|
self.actionPreferences.triggered.connect(self.cb_modify_app_param)
|
|
self.actionHome_Fast_Stages.triggered.connect(self.cb_deltatau_home_faststages)
|
|
self.actionAbout.triggered.connect(self.cb_about)
|
|
|
|
#self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.SHIFT + Qt.Key_L), self)
|
|
#self.shortcut.activated.connect(self._OLD_roi_add_line)
|
|
|
|
#self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.SHIFT + Qt.Key_R), self)
|
|
#self.shortcut.activated.connect(self._OLD_roi_add_rect)
|
|
|
|
#self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.SHIFT + Qt.Key_M), self)
|
|
#self.shortcut.activated.connect(self._OLD_toggle_mouse_tracking)
|
|
|
|
#self.shortcut = QShortcut(QKeySequence(Qt.Key_F11), self)
|
|
#self.shortcut.activated.connect(self._OLD_toggle_maximized)
|
|
|
|
#self.shortcut = QShortcut(QKeySequence(Qt.Key_F12), self)
|
|
#self.shortcut.activated.connect(self._OLD_show_window_configuration)
|
|
|
|
#self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.Key_B), self)
|
|
#self.shortcut.activated.connect(lambda: self._OLD_beammark.toggle_handle())
|
|
|
|
#self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.Key_P), self)
|
|
#self.shortcut.activated.connect(self._OLD_camera_pause_toggle)
|
|
|
|
# adding a menu entry to one of the menus
|
|
#action = QAction("parameters", self)
|
|
#action.setToolTip("modify application parameters")
|
|
#action.setStatusTip("modify application parameters")
|
|
#action.triggered.connect(self.cb_modify_app_param)
|
|
#self.menuAdvanced.addAction(action)
|
|
|
|
#action = QAction("geometry", self)
|
|
#action.setToolTip("Update optical center, beam marker size etc.")
|
|
#action.setStatusTip("Update optical center, beam marker size etc.")
|
|
#action.triggered.connect(lambda x: cfg.dlg_geometry(self))
|
|
#self.menuAdvanced.addAction(action)
|
|
|
|
#action = QAction("Backlight Reference Positions", self)
|
|
#action.setToolTip("Update the reference positions for the backlight")
|
|
#action.setStatusTip("Update the reference positions for the backlight")
|
|
#action.triggered.connect(cfg.dlg_backlight_positions)
|
|
#self.menuAdvanced.addAction(action)
|
|
|
|
#action = QAction("Collimator Reference Positions", self)
|
|
#action.setToolTip("Update the reference positions for the collimator")
|
|
#action.setStatusTip("Update the reference positions for the collimator")
|
|
#action.triggered.connect(cfg.dlg_collimator_reference_positions)
|
|
#self.menuAdvanced.addAction(action)
|
|
|
|
#action = QAction("Post sample tube Reference Positions", self)
|
|
#action.setToolTip("Update the reference positions for the post tube")
|
|
#action.setStatusTip("Update the reference positions for the post tube")
|
|
#action.triggered.connect(cfg.dlg_posttube_references)
|
|
#self.menuAdvanced.addAction(action)
|
|
|
|
#action = QAction("Delta Tau Parameters", self)
|
|
#action.setToolTip("Parameters affecting the Delta Tau")
|
|
#action.setStatusTip("Parameters affecting the Delta Tau")
|
|
#action.triggered.connect(cfg.dlg_deltatau_parameters)
|
|
#self.menuAdvanced.addAction(action)
|
|
|
|
#action = QAction("Cryojet Reference Positions", self)
|
|
#action.setToolTip("Update the reference positions for the cryojet nozzle position")
|
|
#action.setStatusTip("Update the reference positions for the cryojet nozzle position")
|
|
#action.triggered.connect(self.set_cryojet_positions_dialog)
|
|
#self.menuAdvanced.addAction(action)
|
|
|
|
#action = QAction("Tell Mount Positions", self)
|
|
#action.setToolTip("Mount positions for TELL sample changer")
|
|
#action.setStatusTip("Parameters affecting the Delta Tau")
|
|
#action.triggered.connect(cfg.dlg_tell_mount_positions)
|
|
#self.menuAdvanced.addAction(action)
|
|
|
|
def closeEvent(self, event): #overloaded function
|
|
"""this is called when the user clicks the window's cross icon"""
|
|
if (self._do_quit or QMessageBox.question( self, "", "Are you sure you want to quit?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No,) == QMessageBox.Yes):
|
|
app=QApplication.instance()
|
|
cfg=app._cfg
|
|
try:
|
|
pv=app._camera._pv['pic']
|
|
pv.clear_auto_monitor() # disconnect PV monitor callback -> program exit faster.
|
|
except AttributeError:
|
|
_log.warning('disconnect PV callback failed.')
|
|
|
|
_log.info('disconnect PV callback')
|
|
cfg.setValue(cfg.WINDOW_GEOMETRY, self.saveGeometry())
|
|
cfg.setValue(cfg.WINDOW_STATE, self.saveState())
|
|
cfg.setValue(cfg.WINDOW_SPLITTER, self._wd_splitter.sizes())
|
|
cfg.setValue('last_active', time.time())
|
|
_log.info('save settings')
|
|
#QMainWindow.closeEvent(self, event)
|
|
_log.info('closeEvent done')
|
|
else:
|
|
event.ignore()
|
|
|
|
def cb_update_center_widget(self, index):
|
|
_log.debug('TODO: check to not connect smultiple igNewCamImg')
|
|
if index > 0: # not showing camera image
|
|
_log.warning("listening to zescape")
|
|
pass
|
|
#self.timer.stop()
|
|
#zescape.start_listening()
|
|
#self.timer = QtCore.QTimer(self)
|
|
#self.timer.timeout.connect(self.check_zescape)
|
|
#self.timer.start(20)
|
|
else:
|
|
app=QApplication.instance()
|
|
try:
|
|
self.cb_new_frame_sim()
|
|
except AttributeError:
|
|
self.sigNewCamImg.connect(self.cb_update_img)
|
|
cam=app._camera
|
|
cfg=app._cfg
|
|
param=cam.get_param()
|
|
|
|
paramSv=cfg.value(AppCfg.GEO_CAM_PARAM)
|
|
if not paramSv:
|
|
cfg.setValue(AppCfg.GEO_CAM_PARAM, param)
|
|
else:
|
|
for k,v in param.items():
|
|
if v!=paramSv[k]:
|
|
if type(v)==tuple:
|
|
if v==tuple(paramSv[k]):
|
|
continue
|
|
cam.set_param(**paramSv)
|
|
#cfg.setValue(AppCfg.GEO_CAM_PARAM, param)
|
|
#break
|
|
_log.debug(f'diff:{k},{v} != {paramSv[k]}')
|
|
|
|
cam.run(self.cb_new_frame_pv)
|
|
app._pv_mon_lst.append(cam._pv['pic'])
|
|
|
|
def cb_new_frame_pv(self, **kwargs):
|
|
try: #pv-monitor-func
|
|
#thrd=threading.current_thread()
|
|
#_log.debug(f'thread:{thrd.getName()}, {thrd.native_id}')
|
|
#_log.debug(f"{kwargs['timestamp']}")
|
|
|
|
app=QApplication.instance()
|
|
cam=app._camera
|
|
sz=cam._sz
|
|
if kwargs['count']==sz[0]*sz[1]:
|
|
pic=kwargs['value'].reshape(sz[::-1])
|
|
else:
|
|
sz=app._camera.update_size()
|
|
pic=kwargs['value'].reshape(sz[::-1])
|
|
if pic.dtype==np.int16:
|
|
pic.dtype=np.uint16
|
|
camera.epics_cam.set_fiducial(pic, 255)
|
|
cam._pic=pic
|
|
cam._timestamp=kwargs['timestamp']
|
|
try:
|
|
cam.process()
|
|
except AttributeError as e:
|
|
pass
|
|
# self._goImg.setImage(cam._pic) caused some deadlocks.
|
|
# therefore try to update the image with signals instead
|
|
self.sigNewCamImg.emit()
|
|
except Exception as e:
|
|
_log.critical(f'{e}')
|
|
|
|
def cb_update_img(self):
|
|
#thrd=threading.current_thread()
|
|
#_log.debug(f'thread:{thrd.getName()}, {thrd.native_id}')
|
|
|
|
app=QApplication.instance()
|
|
cam=app._camera
|
|
|
|
self._goImg.setImage(cam._pic)
|
|
#vb.setRange(QRectF(-1300,-1100,1400,1200))
|
|
|
|
#force a segmentation fault
|
|
#try:
|
|
# dbgSegFault=self._dbgSegFault
|
|
#except AttributeError as e:
|
|
# dbgSegFault=self._dbgSegFault=0
|
|
#_log.warning(f'dbgSegFault:{dbgSegFault}')
|
|
#if dbgSegFault>100:
|
|
# _log.critical('Xforce a segmentation fault')
|
|
# import ctypes
|
|
# ctypes.string_at(0)
|
|
#self._dbgSegFault+=1
|
|
|
|
def cb_new_frame_sim(self, **kwargs):
|
|
app=QApplication.instance()
|
|
sim=app._camera._sim
|
|
imgSeq=sim['imgSeq']
|
|
idx=sim['imgIdx']
|
|
sim['imgIdx']=(idx+1)
|
|
imgIdx=idx%imgSeq.shape[0]
|
|
# _log.info('simulated idx:{}'.format(idx))
|
|
pic=imgSeq[imgIdx]
|
|
self._goImg.setImage(pic)
|
|
|
|
delay=500 # ms -> 2fps
|
|
QtCore.QTimer.singleShot(delay, self.cb_new_frame_sim)
|
|
|
|
#force a segmentation fault
|
|
#_log.warning(f'imgIdx:{imgIdx},idx:{idx}')
|
|
#if idx==20:
|
|
# _log.critical('force a segmentation fault')
|
|
# import ctypes
|
|
# ctypes.string_at(0)
|
|
|
|
def load_stylesheet(self):
|
|
with open(os.path.join(os.path.dirname(__file__),"swissmx.css"), "r") as sheet:
|
|
self.setStyleSheet(sheet.read())
|
|
|
|
def setup_sliders(self):
|
|
cont = self._wd_right
|
|
self._tweak_container = QWidget()
|
|
self._tweak_container.setLayout(QVBoxLayout())
|
|
cont.layout().addWidget(self._tweak_container)
|
|
layout = self._tweak_container.layout()
|
|
layout.setSpacing(0)
|
|
layout.setContentsMargins(0, 0, 0, 0)
|
|
|
|
self.zoombox = zoom.Zoom()
|
|
self.zoombox.init_settings()
|
|
self.zoombox.zoomChanged.connect(self.cb_zoom_changed)
|
|
self.zoombox.moveBacklight.connect(self.cb_move_backlight_safe)
|
|
|
|
layout.addWidget(self.zoombox)
|
|
|
|
toolbox = QToolBox()
|
|
layout.addWidget(toolbox)
|
|
|
|
self.build_group_faststage(toolbox)
|
|
# self.build_slits_group(toolbox)
|
|
self.build_group_collimator(toolbox)
|
|
self.build_group_posttube(toolbox)
|
|
self.build_group_xeye(toolbox)
|
|
#self.build_cryo_group(toolbox)
|
|
|
|
# monitor all axis for an axis fault
|
|
for key, tweaker in self.tweakers.items():
|
|
tweaker.event_axis_fault.connect(self.cb_axis_fault)
|
|
|
|
#self.tweakers["fast_x"].event_readback.connect(lambda alias, value, kw: self.fast_x_position.emit(value))
|
|
#self.tweakers["fast_y"].event_readback.connect(lambda alias, value, kw: self.fast_y_position.emit(value))
|
|
|
|
self.tweakers["fast_x"].event_rbv.connect(lambda rec_name, kw: self.track_objects())
|
|
self.tweakers["fast_y"].event_rbv.connect(lambda rec_name, kw: self.track_objects())
|
|
# layout.addStretch()
|
|
|
|
def cb_marker_moved(self,obj,*args,**kwargs):
|
|
_log.debug(args)
|
|
_log.debug(kwargs)
|
|
app=QApplication.instance()
|
|
cfg=app._cfg
|
|
geo=app._geometry
|
|
opt_ctr=geo._opt_ctr
|
|
if obj==self._goOptCtr:
|
|
oc_pos=obj.pos()
|
|
oc_sz=obj.size()
|
|
geo._opt_ctr=opt_ctr=np.array(oc_pos+oc_sz/2)
|
|
if (QMessageBox.question(self, "save config", f"save optical center position {opt_ctr} permanently?",
|
|
QMessageBox.Yes|QMessageBox.No, QMessageBox.No, )==QMessageBox.Yes):
|
|
cfg.setValue(AppCfg.GEO_OPT_CTR,opt_ctr)
|
|
elif obj==self._goBeamMarker:
|
|
bm_pos=obj.pos()
|
|
bm_sz=obj.size()
|
|
pos=np.array(bm_pos+bm_sz/2)*1000 # convert from mm to um
|
|
if (QMessageBox.question(self, "save config", f"save beam marker position {pos} permanently?",
|
|
QMessageBox.Yes|QMessageBox.No, QMessageBox.No, )==QMessageBox.Yes):
|
|
cfg.setValue(AppCfg.GEO_BEAM_POS,pos)
|
|
self.track_objects()
|
|
|
|
def cb_zoom_changed(self, value):
|
|
#self.zoomChanged.emit(value)
|
|
app=QApplication.instance()
|
|
geo=app._geometry
|
|
try:
|
|
geo.interp_zoom(value)
|
|
except AttributeError as e:
|
|
_log.warning(e)
|
|
else:
|
|
grp=self._goFixGrp
|
|
opt_ctr=geo._opt_ctr
|
|
A=np.asarray(geo._pix2pos.I)
|
|
#trf=cfg.value(AppCfg.GEO_CAM_TRF)
|
|
tr=self._goImgGrp.transform()
|
|
p=tr.map(opt_ctr[0],opt_ctr[1])
|
|
tr.setMatrix(A[0, 0], A[0, 1], 0,
|
|
A[1, 0], A[1, 1], 0,
|
|
p[0], p[1], 1) # translate dx,dy
|
|
grp.setTransform(tr)
|
|
|
|
#bm=self._goBeamMarker
|
|
#bm_sz=np.abs(geo.pos2pix(bm._size_eu))
|
|
#bm_pos=-opt_ctr-geo.pos2pix(bm._pos_eu)-bm_sz/2
|
|
#bm.blockSignals(True) # avoid to call cb_marker_moved
|
|
#bm.setPos(bm_pos,finish=False)
|
|
#bm.setSize(bm_sz)
|
|
#bm.blockSignals(False)
|
|
#_log.debug(f"zoom->{value} beam marker pos:{bm_pos} sz:{bm_sz})")
|
|
self.track_objects()
|
|
|
|
def cb_move_backlight_safe(self, pos):
|
|
app=QApplication.instance()
|
|
bl=app._backlight
|
|
# any move of backlight requires post sample tube out
|
|
self.move_post_tube("out")
|
|
bl.move(pos)
|
|
|
|
def get_tweaker(self, rec, mtype=0, **kwargs):
|
|
app = QApplication.instance()
|
|
sim=app._args.sim
|
|
if mtype == 0: # default epics motor record
|
|
if sim&0x10:
|
|
m_tw=SimMotorTweak()
|
|
else:
|
|
m_tw=MotorTweak()
|
|
elif mtype==1: # smaract motor record
|
|
if sim&0x20:
|
|
m_tw=SimMotorTweak()
|
|
else:
|
|
m_tw=MotorTweak()
|
|
m_tw.connect_motor(rec, **kwargs)
|
|
self.tweakers[kwargs['alias']] = m_tw
|
|
return m_tw
|
|
|
|
def build_post_tube_tandem_tweaker(self):
|
|
block = QWidget()
|
|
block.setAccessibleName("post tube tamden")
|
|
block.setContentsMargins(0, 0, 0, 0)
|
|
block.setLayout(QVBoxLayout())
|
|
c = QWidget()
|
|
glay = QGridLayout()
|
|
c.setLayout(glay)
|
|
|
|
b = QWidget()
|
|
blay = QHBoxLayout()
|
|
b.setLayout(blay)
|
|
blay.addWidget(QLabel("Tweak (mm)"))
|
|
self._post_tandem_tweak_val = QLineEdit("1.0")
|
|
blay.addWidget(self._post_tandem_tweak_val)
|
|
glay.addWidget(b, 0, 0, 1, 2)
|
|
|
|
icon = qtawesome.icon("material.arrow_back")
|
|
but = QPushButton(icon, "")
|
|
but.clicked.connect(lambda m, pos="x+": self.move_post_tube(pos))
|
|
glay.addWidget(but, 1, 0)
|
|
|
|
# c.setLayout(glay)
|
|
icon = qtawesome.icon("material.arrow_forward")
|
|
but = QPushButton(icon, "")
|
|
but.clicked.connect(lambda m, pos="x-": self.move_post_tube(pos))
|
|
glay.addWidget(but, 1, 1)
|
|
|
|
# c.setLayout(glay)
|
|
icon = qtawesome.icon("material.arrow_downward")
|
|
but = QPushButton(icon, "")
|
|
but.clicked.connect(lambda m, pos="y-": self.move_post_tube(pos))
|
|
glay.addWidget(but, 2, 0)
|
|
|
|
# c.setLayout(glay)
|
|
icon = qtawesome.icon("material.arrow_upward")
|
|
but = QPushButton(icon, "")
|
|
but.clicked.connect(lambda m, pos="y+": self.move_post_tube(pos))
|
|
glay.addWidget(but, 2, 1)
|
|
|
|
block.layout().addWidget(c)
|
|
return block
|
|
|
|
def cb_axis_fault(self, pvname, kw):
|
|
"""
|
|
swissmx - {'pvname': 'SAR-EXPMX:MOT_FY.STAT', 'value': 0, 'char_value': 'NO_ALARM', 'status': 0, 'ftype': 17, 'chid': 38980392,
|
|
'host': 'SAR-CPPM-EXPMX1.psi.ch:5064', 'count': 1, 'access': 'read-only', 'write_access': False, 'read_access': True,
|
|
'severity': 0,
|
|
'timestamp': 1537867290.914189, 'precision': None, 'units': None, 'enum_strs': ('NO_ALARM', 'READ', 'WRITE', 'HIHI', 'HIGH', 'LOLO', 'LOW', 'STATE', 'COS', 'COMM', 'TIMEOUT', 'HWLIMIT', 'CALC', 'SCAN', 'LINK', 'SOFT'), 'upper_disp_limit': None, 'lower_disp_limit': None, 'upper_alarm_limit': None, 'lower_alarm_limit': None, 'lower_warning_limit': None, 'upper_warning_limit': None, 'upper_ctrl_limit': None, 'lower_ctrl_limit': None, 'nelm': 1, 'type': 'time_enum', 'typefull': 'time_enum', 'motor_field': 'STAT', 'source_field': 'STAT', 'cb_info': (1, <PV 'SAR-EXPMX:MOT_FY.STAT', count=1, type=time_enum, access=read-only>), 'alias': 'fast Y'}
|
|
|
|
:param pvname:
|
|
:param kw:
|
|
:return:
|
|
"""
|
|
_log.debug(f"AXIS FAULT: {kw}")
|
|
if kw["severity"]:
|
|
msg = f"axis {pvname} has a fault"
|
|
_log.critical(msg)
|
|
_log.critical(f"{kw}")
|
|
else:
|
|
msg = ""
|
|
self._message_critical_fault.setText(msg)
|
|
|
|
@staticmethod
|
|
def pv_monitor_add(widgets):
|
|
#pv-monitor-func
|
|
# this is a function to search for a issue that crashes the application
|
|
# during data acquisition
|
|
#_log.info('modify monitors')
|
|
app=QApplication.instance()
|
|
pv_mon_lst=app._pv_mon_lst
|
|
try:
|
|
for w in widgets[:-1]: #ignore last item
|
|
for pv in w._motor._pvs.values():
|
|
pv_mon_lst.append(pv)
|
|
# for k,pv in w._motor._pvs.items():
|
|
# pv.auto_monitor=False
|
|
# for k,pv in w._motor._pvs.items():
|
|
# pv.auto_monitor=True
|
|
except AttributeError as e:
|
|
if type(w)==SimMotorTweak:
|
|
_log.warning(f'{e},{w},{w._motor._rec_name}')
|
|
else:
|
|
_log.warning(f'{e},{w},{pv}')
|
|
sim=app._args.sim
|
|
assert sim&0x10,'assuming simulated motors'
|
|
|
|
|
|
def build_group_faststage(self, toolbox):
|
|
pfx=QApplication.instance()._cfg.value(AppCfg.GBL_DEV_PREFIX)[0]
|
|
c=QWidget()
|
|
c.setLayout(QGridLayout())
|
|
f=lambda v: lambda x:self.move_detector(v) #needs nested lambda to work ...
|
|
for i,v in enumerate(('in','out')):
|
|
but=QPushButton('detector '+v)
|
|
but.clicked.connect(f(v))
|
|
c.layout().addWidget(but, 0, i)
|
|
widgets=[
|
|
# self.get_tweaker('f"{pfx}:MOT_BLGT', alias='backlight', label='backlight'),
|
|
self.get_tweaker(f"{pfx}:MOT_FY", alias="fast_y", label="fast Y"),
|
|
self.get_tweaker(f"{pfx}:MOT_FX", alias="fast_x", label="fast X"),
|
|
self.get_tweaker(f"{pfx}:MOT_CY", alias="base_y", label="base Y"),
|
|
self.get_tweaker(f"{pfx}:MOT_CX", alias="base_x", label="base X"),
|
|
self.get_tweaker(f"{pfx}:MOT_CZ", alias="base_z", label="base Z"),
|
|
self.get_tweaker(f"{pfx}:MOT_DET_Z", alias="det_z", label="detector Z"),
|
|
c
|
|
]
|
|
qutilities.add_item_to_toolbox(toolbox,"Fast Stage",widget_list=widgets)
|
|
WndSwissMx.pv_monitor_add(widgets)
|
|
|
|
def build_group_collimator(self, toolbox):
|
|
pfx=QApplication.instance()._cfg.value(AppCfg.GBL_DEV_PREFIX)[1]
|
|
_log.warning('TODO: cleanup PV names!!!')
|
|
pfx='SARES30-SMX'
|
|
c=QWidget()
|
|
c.setLayout(QGridLayout())
|
|
f=lambda v: lambda x:self.move_collimator(v) #needs nested lambda to work ...
|
|
for i,v in enumerate(('in','out','ready')):
|
|
but=QPushButton(v)
|
|
but.clicked.connect(f(v))
|
|
c.layout().addWidget(but, 0, i)
|
|
widgets = [
|
|
self.get_tweaker(f"{pfx}:MCS1", alias="colli_x", label="colli X", mtype=1,),
|
|
self.get_tweaker(f"{pfx}:MCS2", alias="colli_y", label="colli Y", mtype=1,),
|
|
c,
|
|
]
|
|
qutilities.add_item_to_toolbox(toolbox,"Collimator",widget_list=widgets)
|
|
WndSwissMx.pv_monitor_add(widgets)
|
|
|
|
def build_group_posttube(self, toolbox):
|
|
pfx=QApplication.instance()._cfg.value(AppCfg.GBL_DEV_PREFIX)[1]
|
|
_log.warning('TODO: cleanup PV names!!!')
|
|
pfx='SARES30-SMX'
|
|
widgets = [
|
|
self.get_tweaker(f"{pfx}:MCS4", alias="tube_usx", label="upstream X", mtype=1,),
|
|
self.get_tweaker(f"{pfx}:MCS6", alias="tube_usy", label="upstream Y", mtype=1,),
|
|
self.get_tweaker(f"{pfx}:MCS5", alias="tube_dsx", label="downstream X", mtype=1,),
|
|
self.get_tweaker(f"{pfx}:MCS7", alias="tube_dsy", label="downstream Y", mtype=1,),
|
|
self.get_tweaker(f"{pfx}:MCS8", alias="tube_z", label="tube Z", mtype=1),
|
|
]
|
|
c = QWidget()
|
|
c.setLayout(QGridLayout())
|
|
but = QPushButton("post tube out")
|
|
but.clicked.connect(lambda m, pos="out": self.move_post_tube(pos))
|
|
c.layout().addWidget(but, 0, 0)
|
|
but = QPushButton("post tube in")
|
|
but.clicked.connect(lambda m, pos="in": self.move_post_tube(pos))
|
|
c.layout().addWidget(but, 0, 1)
|
|
widgets.append(c)
|
|
|
|
label = "Post Sample Tube"
|
|
block = QWidget()
|
|
block.setAccessibleName(label)
|
|
block.setContentsMargins(0, 0, 0, 0)
|
|
block.setLayout(QVBoxLayout())
|
|
for w in widgets:
|
|
block.layout().addWidget(w)
|
|
|
|
block.layout().addWidget(self.build_post_tube_tandem_tweaker())
|
|
block.layout().addStretch()
|
|
toolbox.addItem(block, label)
|
|
|
|
def build_group_xeye(self, toolbox):
|
|
pfx=QApplication.instance()._cfg.value(AppCfg.GBL_DEV_PREFIX)[1]
|
|
_log.warning('TODO: cleanup PV names!!!')
|
|
pfx='SARES30-SMX'
|
|
widgets=[
|
|
self.get_tweaker(f"{pfx}:MCS14", alias="xeye_x", label="X", mtype=1),
|
|
self.get_tweaker(f"{pfx}:MCS15", alias="xeye_y", label="Y", mtype=1),
|
|
]
|
|
qutilities.add_item_to_toolbox(toolbox,"X-Ray Eye",widget_list=widgets)
|
|
WndSwissMx.pv_monitor_add(widgets)
|
|
|
|
def set_active_task(self, task):
|
|
_log.info("TASK == {}".format(task))
|
|
self._active_task = task
|
|
|
|
def active_task(self):
|
|
return self._active_task
|
|
|
|
def cb_mouse_move(self, pos):
|
|
app = QApplication.instance()
|
|
geo = app._geometry
|
|
#pos = pixel position on the widget
|
|
task = self.active_task()
|
|
z = app._zoom.get_val()
|
|
|
|
bm=self._goBeamMarker
|
|
#pos=event.scenePos()
|
|
pImg=pg.Point(self._goImg.mapFromScene(pos))
|
|
pTrk=pg.Point(self._goTracked.mapFromScene(pos))
|
|
pFix=pg.Point(self._goFixGrp.mapFromScene(pos))
|
|
pFix-=bm.pos()+bm.size()/2
|
|
fx=self.tweakers["fast_x"].get_val()
|
|
fy=self.tweakers["fast_y"].get_val()
|
|
pRel=pTrk-(fx,fy)
|
|
#s=f'pImg{pImg} pTrk{pTrk} bm._pos_eu{bm._pos_eu}'
|
|
try:
|
|
pln=geo._fitPlane
|
|
except AttributeError:
|
|
cz=np.nan
|
|
else:
|
|
cz=pln[0]*pTrk[0]+pln[1]*pTrk[1]+pln[2] # z=ax+by+c
|
|
|
|
s=\
|
|
f'img pix ({pImg[0]:0.1f} {pImg[1]:0.1f})px \u23A2 ' \
|
|
f'stage ({pTrk[0]:0.4f} {pTrk[1]:>0.4f} {cz:>0.4f})mm \u23A2 ' \
|
|
f'dist to beam ({pFix[0]:>0.4f} {pFix[1]:>0.4f})mm '
|
|
#f'dist to beam ({pRel[0]:>0.6g} {pRel[1]:>0.6g}mm) '
|
|
|
|
#_log.debug(s)
|
|
self._lb_coords.setText(s)
|
|
#self._lb_coords.setText(
|
|
# "\u23FA{:>}:{:>6.0f} {:<.0f}"
|
|
# "\u23FA{:>}:{:>6.0f} {:<.0f}"
|
|
# "\u23FA{:>}:{:<.0f}"
|
|
# "\u23FA{:>}:{:>6.1f} {:<.1f} \u00B5"
|
|
# "\u23FA{:>}:{:>7.3f} {:<.3f} mm".format(
|
|
# "Beam at", bx, by,
|
|
# "Pixel coord ", x, y,
|
|
# "PPM", ppm,
|
|
# "Distance to beam", 1000 * dx, 1000 * dy,
|
|
# "Stage", fx, fy,
|
|
# )
|
|
#)
|
|
|
|
def cb_mouse_click_geometry_calib(self, event):
|
|
app=QApplication.instance()
|
|
zoom = app._zoom.get_val()
|
|
fast_x=self.tweakers["fast_x"];fast_y=self.tweakers["fast_y"]
|
|
if self._btn_pix2pos.isChecked():
|
|
try:
|
|
raw=app._raw_pix2pos[zoom]
|
|
except KeyError as e:
|
|
raw=app._raw_pix2pos[zoom]=list()
|
|
imgPos=self._goImg.mapFromScene(event.scenePos())
|
|
fx=fast_x.get_rbv();
|
|
fy=fast_y.get_rbv()
|
|
x=imgPos.x();y=imgPos.y()
|
|
_log.debug('append calib: zoom:{} stage_x_y:{}/{} cam_pix_x_y:{}/{}'.format(zoom, fx, fy, x, y))
|
|
raw.append((fx, fy, x, y))
|
|
self._wd_calib_tree.setData(app._raw_pix2pos)
|
|
elif self._btn_opt_ctr.isChecked():
|
|
try:
|
|
raw=app._raw_opt_ctr[zoom]
|
|
except KeyError as e:
|
|
raw=app._raw_opt_ctr[zoom]=list()
|
|
imgPos=self._goImg.mapFromScene(event.scenePos())
|
|
x=imgPos.x();y=imgPos.y()
|
|
_log.debug('append calib: zoom:{} cam_pix_x_y:{}/{}'.format(zoom, x, y))
|
|
raw.append(( x, y))
|
|
self._wd_calib_tree.setData(app._raw_opt_ctr)
|
|
else:
|
|
QMessageBox.warning(self, "calibration", "no calibration started yet.\nPress 'pix2pos' or 'opt_ctr' button first")
|
|
|
|
|
|
def cb_mouse_click(self, event):
|
|
#_log.debug("{}".format(event))
|
|
#_log.debug("screen pos {}".format(event.screenPos())) #pixel position on the whole screen
|
|
#_log.debug("scene pos {}".format(event.scenePos())) #pixel position on the scene (including black frame)
|
|
#_log.debug(" pos {}".format(event.pos())) #pixel position of the ckicked object mapped to its coordinates
|
|
#p=event.scenePos()
|
|
#_log.debug(f"vb pos {self.vb.mapFromScene(p)}") #pixel position on the scene (including black frame)
|
|
#for o in self.vb.childGroup.childItems():
|
|
# _log.debug(f"{type(o)} pos {o.mapFromScene(p)}") #pixel position on the scene (including black frame)
|
|
|
|
#_log.debug(f"currentItem:{event.currentItem}")
|
|
app=QApplication.instance()
|
|
mod=event.modifiers()
|
|
btn=event.button()
|
|
task = self.active_task()
|
|
if task==TASK_SETUP_GEOMETRY_CALIB:
|
|
if btn==Qt.LeftButton and not mod&Qt.ControlModifier:
|
|
self.cb_mouse_click_geometry_calib(event)
|
|
return
|
|
################################
|
|
# Ctrl-left : move to position
|
|
# Ctrl-right : object tree
|
|
################################
|
|
|
|
#dblclick = event.double()
|
|
if btn==Qt.LeftButton:
|
|
if mod&Qt.ShiftModifier:
|
|
pass
|
|
elif mod&Qt.ControlModifier:
|
|
pos=event.scenePos()
|
|
_log.debug(f'move to position :scene pos {pos}')
|
|
geo=app._geometry
|
|
|
|
bm=self._goBeamMarker
|
|
pImg=pg.Point(self._goImg.mapFromScene(pos))
|
|
pTrk=pg.Point(self._goTracked.mapFromScene(pos))
|
|
pFix=pg.Point(self._goFixGrp.mapFromScene(pos))
|
|
pFix-=bm.pos()+bm.size()/2
|
|
|
|
|
|
#create trace
|
|
self.add_camera_trace(pFix)
|
|
|
|
_log.debug(f'dist to beam ({pFix[0]:>0.6g} {pFix[1]:>0.6g}mm)')
|
|
fx_motor=self.tweakers["fast_x"]
|
|
fy_motor=self.tweakers["fast_y"]
|
|
|
|
fx_motor.move_rel(pFix[0])
|
|
fy_motor.move_rel(pFix[1])
|
|
|
|
#fx_motor.move_abs(fx_motor.get_val()+pFix[0])
|
|
#fy_motor.move_abs(fy_motor.get_val()+pFix[1])
|
|
|
|
#if none of the image is visible, recenter it
|
|
r1=self.vb.viewRect()
|
|
r2=self.vb.itemBoundingRect(self._goImg)
|
|
if not r1.intersects(r2):
|
|
r1.translate(r2.center()-r1.center())
|
|
self.vb.setRange(r1)
|
|
|
|
try:
|
|
pln=geo._fitPlane
|
|
except AttributeError: pass
|
|
else:
|
|
cz_motor=self.tweakers["base_z"]
|
|
cz=pln[0]*pTrk[0]+pln[1]*pTrk[1]+pln[2] #z=ax+by+c
|
|
if cz:
|
|
cz_motor.move_abs(cz)
|
|
else:
|
|
_log.critical(f'{cz} {pTrk} {pln}')
|
|
_log.debug(f'cz={pln[0]:.4g}*{pTrk[0]:.4g}+{pln[1]:.4g}*{pTrk[1]:.4g}+{pln[2]:.4g}={cz}')
|
|
pass
|
|
elif mod&Qt.AltModifier:
|
|
pass
|
|
else:
|
|
pass
|
|
elif btn==Qt.RightButton:
|
|
if mod&Qt.ShiftModifier:
|
|
pass
|
|
elif mod&Qt.ControlModifier:
|
|
_log.debug('object tree')
|
|
UsrGO.obj_tree(self.vb)
|
|
pass
|
|
elif mod&Qt.AltModifier:
|
|
pass
|
|
else:
|
|
pass
|
|
return
|
|
|
|
return
|
|
assert(event.currentItem==self.vb)
|
|
#UsrGO.obj_info(self.img)
|
|
|
|
event.pos()# return Point(self.currentItem.mapFromScene(self._scenePos))
|
|
|
|
app=QApplication.instance()
|
|
|
|
if event.button() == Qt.RightButton:
|
|
UsrGO.obj_info(self.vb.childTransform())
|
|
event.ignore()
|
|
return
|
|
pos = event.scenePos()
|
|
zoom_level = app._zoom.get_val()
|
|
_log.debug(" zoom {}".format(zoom_level))
|
|
|
|
try:
|
|
ppm = self.ppm_fitter(zoom_level)
|
|
except:
|
|
ppm = 1E3
|
|
dblclick = event.double()
|
|
shft = Qt.ShiftModifier & event.modifiers()
|
|
ctrl = Qt.ControlModifier & event.modifiers()
|
|
alt = Qt.AltModifier & event.modifiers()
|
|
|
|
xy = self._goImg.mapFromScene(pos)
|
|
x, y = xy.x(), xy.y()
|
|
bx, by = self.get_beam_mark_on_camera_xy()
|
|
a = np.asarray([x, y])
|
|
b = np.asarray([bx, by])
|
|
pixel_dist = np.linalg.norm(a - b)
|
|
_log.debug("distance to beam: {:.1f} pixels".format(pixel_dist))
|
|
|
|
omega = omega_motor.get_position()
|
|
_log.debug("omega at {:.03f} degrees".format(omega))
|
|
|
|
fx = fx_motor.get_position()
|
|
fy = fy_motor.get_position()
|
|
dx = (x - bx) / ppm
|
|
dy = -(y - by) / ppm
|
|
fx += dx
|
|
fy += dy
|
|
|
|
_log.debug("click at : {:.3f} {:.3f}".format(x, y))
|
|
_log.debug("beam at : {:.3f} {:.3f}".format(bx, by))
|
|
_log.debug("gonio at : {:.3f} {:.3f}".format(fx, fy))
|
|
|
|
# publisher.submit(
|
|
# {"event": "gonio", "data": self.get_gonio_positions(as_json=True)}
|
|
# )
|
|
_log.debug("TASK = {}".format(task))
|
|
# Click without modifers moves clicked position to beam mark
|
|
if task not in (TASK_SETUP_PPM_CALIBRATION, TASK_SETUP_BEAM_CENTER):
|
|
if not (ctrl or shft or alt):
|
|
if ppm == 0:
|
|
_log.warning("PPM is not defined - do the PPM calibration")
|
|
|
|
if task == TASK_HELICAL:
|
|
dx = (x - bx) / ppm
|
|
dy = -(y - by) / ppm
|
|
_log.info("moving base_X, fast_Y {:.3f}, {:.3f}".format(dx, dy))
|
|
bx_motor.move_rel(dx)
|
|
fy_motor.move_rel(dy)
|
|
else:
|
|
dx = np.cos(omega * np.pi / 180) * (x - bx) / ppm
|
|
dy = -(y - by) / ppm
|
|
_log.info("moving fast x/y to {:.3f}, {:.3f}".format(dx, dy))
|
|
fx_motor.move_rel(dx)
|
|
fy_motor.move_rel(dy)
|
|
self.fast_dx_position.emit(dx)
|
|
self.fast_dy_position.emit(dy)
|
|
return
|
|
|
|
# CTRL-Click always just moves Z
|
|
if ctrl and not (shft or alt):
|
|
mm = np.sign(y - by) * pixel_dist / 10000.
|
|
_log.debug("moving Z stage by {:.3f} mm".format(mm))
|
|
bz_motor.move_rel(mm)
|
|
return
|
|
|
|
if task in (TASK_SETUP_PPM_CALIBRATION, TASK_SETUP_BEAM_CENTER):
|
|
if ctrl and self._ppm_calibration.isChecked():
|
|
if self._ppm_click is not None:
|
|
feature_dist = self._ppm_feature_size_spinbox.value() / 1000.
|
|
b = self._ppm_click
|
|
pixel_dist = np.linalg.norm(a - b)
|
|
ppm = pixel_dist / feature_dist
|
|
_log.debug("calib save a {}".format(a))
|
|
_log.debug("calib save b {}".format(b))
|
|
_log.debug(" feature {}".format(feature_dist))
|
|
_log.debug("a -> b pixels {}".format(pixel_dist))
|
|
_log.debug(" ppm({}) {}".format(zoom_level, pixel_dist))
|
|
|
|
self._zoom_to_ppm[zoom_level] = ppm
|
|
settings.setValue(CAMERA_ZOOM_TO_PPM, self._zoom_to_ppm)
|
|
self._ppm_click = None
|
|
_log.debug("ppm data so far {}".format(self._zoom_to_ppm))
|
|
else:
|
|
_log.debug("calib save b {}".format(a))
|
|
self._ppm_click = a
|
|
return
|
|
elif task == TASK_SETUP_BEAM_CENTER:
|
|
if ctrl and not (shft or alt):
|
|
_log.info("beam mark add: zoom {} => {:.1f} {:.1f}".format(zoom_level, x, y))
|
|
self.append_to_beam_markers(x, y, zoom_level)
|
|
else:
|
|
_log.warning("click combination not available")
|
|
elif task == TASK_GRID:
|
|
if shft and not (ctrl or alt):
|
|
self.addGridRequest.emit(fx, fy)
|
|
elif task == TASK_PRELOCATED:
|
|
if ctrl and shft:
|
|
omegacos = option(DELTATAU_OMEGACOS)
|
|
if omegacos:
|
|
fx = fx / np.cos(omega * np.pi / 180)
|
|
_log.info(
|
|
"selectin fiducial (omegacos): {:.3f}, {:.3f}".format(fx, fy)
|
|
)
|
|
else:
|
|
_log.info("selectin fiducial: {:.3f}, {:.3f}".format(fx, fy))
|
|
self.fiducialPositionSelected.emit(x, y, fx, fy)
|
|
elif shft and alt:
|
|
self.appendPrelocatedPosition.emit(x, y, fx, fy)
|
|
elif task == TASK_HELICAL:
|
|
_log.info("no action associated ")
|
|
elif task == TASK_EMBL:
|
|
pass
|
|
else:
|
|
pass
|
|
|
|
def track_objects(self):
|
|
#self._goTracked= dictionary with infos about tracked objects
|
|
# objLst = list of objects to track
|
|
# state = (zoom, fx, fy) (created at first call of self.track_objects() )
|
|
app=QApplication.instance()
|
|
geo=app._geometry
|
|
zoom = app._zoom.get_val()
|
|
twk_fx=self.tweakers["fast_x"];twk_fy=self.tweakers["fast_y"]
|
|
# TODO: get_position() is delayed as it is the RBV. do same as for the zoom
|
|
# TODO: and return the last set_point
|
|
fx=twk_fx.get_rbv()
|
|
fy=twk_fy.get_rbv()
|
|
try:
|
|
grp=self._goTracked
|
|
grpTrc=self._goTrace
|
|
zoom_old,fx_old,fy_old=grp._state
|
|
except AttributeError: # at initialization
|
|
self._goTracked=grp=pg.ItemGroup()
|
|
grp.setZValue(20) #higher than beam marker and opt ctr that have z=10
|
|
self.vb.addItem(grp)
|
|
grp._state=(zoom,fx,fy)
|
|
zoom_old=fx_old=fy_old=None
|
|
|
|
#--- camera tracing images ---
|
|
self._goTrace=grpTrc=pg.ItemGroup() # the transformation is the same as for tracked objects
|
|
grpTrc.setZValue(-20) #lower than other objects
|
|
self.vb.addItem(grpTrc)
|
|
|
|
try:
|
|
_log.debug(f"zoom:{zoom_old}->{zoom:d} fx:{fx_old:.3f}->{fx:.3f} fy:{fy_old:.3f}->{fy:.3f}")
|
|
except TypeError:
|
|
_log.debug(f"zoom:{zoom_old}->{zoom} fx:{fx_old}->{fx:.3f} fy:{fy_old}->{fy:.3f}")
|
|
if zoom_old!=zoom:
|
|
geo.interp_zoom(zoom)
|
|
|
|
bm=self._goBeamMarker
|
|
opt_ctr=geo._opt_ctr
|
|
A=np.asarray(geo._pix2pos.I)
|
|
# trf=cfg.value(AppCfg.GEO_CAM_TRF)
|
|
tr=self._goFixGrp.transform()
|
|
p1=bm.pos()+bm.size()/2+(-fx,-fy)
|
|
#p=tr.map(opt_ctr[0], opt_ctr[1])
|
|
p=tr.map(p1[0], p1[1])
|
|
tr.setMatrix(A[0, 0], A[0, 1], 0,
|
|
A[1, 0], A[1, 1], 0,
|
|
p[0], p[1], 1) # translate dx,dy
|
|
grp.setTransform(tr)
|
|
grpTrc.setTransform(tr)
|
|
|
|
def add_camera_trace(self,delta_mm):
|
|
# add up to tracelen image when moveing motors to new positions
|
|
try:
|
|
tracelen=self._goTrace._tracelen
|
|
except AttributeError:
|
|
app=QApplication.instance()
|
|
cfg=app._cfg
|
|
tracelen=self._goTrace._tracelen=cfg.value(AppCfg.GBL_MISC)['img_trace_len']
|
|
if tracelen==0:
|
|
return
|
|
|
|
|
|
trImg=self._goImgGrp.transform()
|
|
trTrk=self._goTracked.transform()
|
|
tr=trImg*trTrk.inverted()[0] # TODO: check if oder correct !!!
|
|
cm=pg.ColorMap((0,255),((0,0,64),(255,255,128)))#,opacity=.5)
|
|
img=pg.ImageItem(self._goImg.image,lut=cm.getLookupTable(0,255))
|
|
img.setTransform(tr)
|
|
self._goTrace.addItem(img) # automatically added to self.vb
|
|
|
|
|
|
cldLst=self._goTrace.childItems()
|
|
if len(cldLst)>tracelen: #check if one image needs to be removed
|
|
r1=self.vb.itemBoundingRect(img)
|
|
for c in cldLst[:-1]:
|
|
#check overlap with newest image, and remove previous if it is more than 30%
|
|
r2=self.vb.itemBoundingRect(c)
|
|
r3=r1.intersected(r2)
|
|
s3=r3.size()
|
|
s1=r1.size()
|
|
if s3.width()/s1.width()>0.3 and s3.height()/s1.height()>0.3:
|
|
self.vb.removeItem(c)
|
|
return
|
|
self.vb.removeItem(cldLst[0]) #if no one was removes, remove oldest
|
|
|
|
|
|
|
|
def cb_modify_app_param(self):
|
|
wnd=WndParameter(self)
|
|
wnd.show()
|
|
|
|
@pyqtSlot()
|
|
def cb_save_cam_image(self,overlays=False):
|
|
app=QApplication.instance()
|
|
cam=app._camera
|
|
filename, _ = QFileDialog.getSaveFileName(self,"Save data file",'', 'PNG files (*.png);;all files (*)')
|
|
if not filename:
|
|
return
|
|
_log.info(f"saving view screenshot: {filename}")
|
|
#try:
|
|
if not overlays:
|
|
import PIL.Image
|
|
#img=PIL.Image.fromarray(cam.pic.astype(np.uint8))
|
|
try:
|
|
pic=cam._pic
|
|
mx=pic.max()
|
|
if pic.max()>255:
|
|
scl=2**int(round(np.log2(mx)-8))
|
|
pic=np.array(pic/scl,dtype=np.uint8)
|
|
elif pic.dtype!=np.uint8:
|
|
pic=np.array(pic, dtype=np.uint8)
|
|
except AttributeError:
|
|
sim=app._camera._sim
|
|
pic=cam._sim['imgSeq'][sim['imgIdx']]
|
|
img=PIL.Image.fromarray(pic)
|
|
img.save(filename)
|
|
else:
|
|
exporter = pg.exporters.ImageExporter(self.vb)
|
|
# set export parameters if needed
|
|
#exporter.parameters()["width"] = 2000 # (note this also affects height parameter)
|
|
# save to file
|
|
exporter.export(filename)
|
|
|
|
#except Exception as e:
|
|
# _log.warning(e)
|
|
# QMessageBox.warning(self, "Screenshot: failed to save image", "Failed to save screenshot!")
|
|
|
|
def cb_execute_collection(self):
|
|
app=QApplication.instance()
|
|
geo=app._geometry
|
|
#zoom=app._zoom.get_val()
|
|
task = self.active_task()
|
|
#self._is_aborted = False
|
|
#method = self._tabs_daq_methods.currentWidget().accessibleName()
|
|
|
|
|
|
|
|
if task == TASK_FIX_TARGET:
|
|
#self.re_connect_collect_button(callback=self.collect_abort_grid, accessibleName="grid_abort", label="Abort Grid Scan",)
|
|
#self._inspect = self._grid_inspect_area
|
|
#self._inspect.setPlainText("")
|
|
|
|
fast_x=self.tweakers["fast_x"]
|
|
fast_y=self.tweakers["fast_y"]
|
|
fx=fast_x.get_val()
|
|
fy=fast_y.get_val()
|
|
opt_ctr=geo._opt_ctr
|
|
grp=self._goTracked
|
|
cldLst=grp.childItems()
|
|
for go in cldLst:
|
|
if type(go)==UsrGO.Fiducial:
|
|
continue
|
|
t=type(go)
|
|
if t not in(UsrGO.FixTargetFrame,UsrGO.Path,UsrGO.Grid):
|
|
_log.warning(f'{t} not supported for FixTargetFrame ->skipped:{go}')
|
|
continue
|
|
try:
|
|
param=go.get_scan_param() #points in coordinate system of ROI
|
|
except AttributeError as e:
|
|
_log.warning(f'no scan parameters for object->skipped:{go}')
|
|
continue
|
|
vb=self.vb
|
|
grp=self._goTracked
|
|
mft=self._moduleFixTarget
|
|
try:
|
|
tmpGoLst=self._goTmp
|
|
except AttributeError:
|
|
tmpGoLst=self._goTmp=[]
|
|
else:
|
|
for go in tmpGoLst:
|
|
vb.removeItem(go)
|
|
tmpGoLst.clear()
|
|
|
|
# adding debug fiducials
|
|
#n=int(p.shape[0]/100)+1
|
|
#for i in range(0,p.shape[0],n):
|
|
# fx,fy=p[i, :]/1000
|
|
# fx=-fx #X axis has inverted sign !
|
|
# l=.06
|
|
# go=UsrGO.Fiducial((fx-l/2, fy-l/2), (l, l), i)
|
|
# grp.addItem(go)
|
|
# tmpGoLst.append(go)
|
|
#mft._tree.setData(grp.childItems())
|
|
|
|
#_log.debug(f'first point:{p[0,:]}')
|
|
#_log.debug(f'step to 2nd point:{p[1,:]-p[0,:]}')
|
|
#print('DEBUG: difference from point to point:')
|
|
#print(np.diff(p, axis=0))
|
|
self.daq_collect(**param)
|
|
|
|
elif task == TASK_GRID:
|
|
self.re_connect_collect_button(
|
|
callback=self.collect_abort_grid,
|
|
accessibleName="grid_abort",
|
|
label="Abort Grid Scan",
|
|
)
|
|
self._inspect = self._grid_inspect_area
|
|
self._inspect.setPlainText("")
|
|
|
|
geo=app._geometry
|
|
zoom=app._zoom.get_val()
|
|
fast_x=self.tweakers["fast_x"];
|
|
fast_y=self.tweakers["fast_y"]
|
|
fx=fast_x.get_val()
|
|
fy=fast_y.get_val()
|
|
opt_ctr=geo._opt_ctr
|
|
grp=self._goTracked
|
|
cldLst=grp.childItems()
|
|
for go in cldLst:
|
|
points=go.get_points() #points in coordinate system of ROI
|
|
# names consists of abrevations
|
|
# part 0: po=position sz=size dt=delta
|
|
# part 1: px=pixel eu=engineering units (e.g. mm)
|
|
po_px=go.pos()
|
|
sz_px=go.size()
|
|
tr=go.transform() # TODO: this is not yet used
|
|
UsrGO.obj_info(tr)
|
|
dt_px=-opt_ctr-po_px
|
|
dt_eu=geo.pix2pos(dt_px)+(fx,fy)
|
|
for i in range(points.shape[0]):
|
|
points[i,:]=dt_eu+geo.pix2pos(points[i,:])#*tr
|
|
|
|
method="grid"
|
|
params=None #grid._grid_dimensions
|
|
# params = ([grid._grid_dimensions[0]], [grid._grid_dimensions[1]]) # FIXME something wrong here<
|
|
self.daq_collect_points(points, visualizer_method=method, visualizer_params=params)
|
|
|
|
#self.daq_grid_collect_grid()
|
|
elif task == TASK_PRELOCATED:
|
|
self._inspect = self._preloc_inspect_area
|
|
self._inspect.setPlainText("")
|
|
self.daq_prelocated_collect_points()
|
|
elif task == TASK_HELICAL:
|
|
self.daq_helical_collect()
|
|
elif task == TASK_EMBL:
|
|
self.daq_embl_collect_points()
|
|
|
|
def cb_esc_sample_exchange(self):
|
|
app=QApplication.instance()
|
|
steps = []
|
|
#if option(CRYOJET_MOTION_ENABLED):
|
|
# steps.append(lambda: self.move_cryojet_nozzle("out"))
|
|
steps=[
|
|
lambda:self.move_detector("out"),
|
|
lambda: self.move_post_tube("out"),
|
|
lambda: app._backlight.move("out"),
|
|
lambda: self.move_collimator("out"),
|
|
lambda:self.move_gonio("mount"),
|
|
]
|
|
self.esc_run_steps(steps, "Transitioning to Sample Exchange","ManualSampleExchange")
|
|
|
|
def cb_esc_sample_alignment(self):
|
|
app=QApplication.instance()
|
|
steps = [
|
|
# lambda: sample_selection.tell.set_current(30.0),
|
|
lambda:self.move_detector("out"),
|
|
lambda: self.move_collimator("out"),
|
|
lambda:self.move_detector("out"),
|
|
lambda:self.move_post_tube("out"),
|
|
lambda:app._backlight.move("in"),
|
|
lambda:self.move_collimator("out"),
|
|
lambda:self.move_gonio("align"),
|
|
]
|
|
#if option(CRYOJET_MOTION_ENABLED):
|
|
# steps.extend([lambda: self.move_cryojet_nozzle("in")])
|
|
#steps.extend([lambda: self.move_post_tube("out"), lambda: app._backlight.move("in")])
|
|
self.esc_run_steps(steps, "Transitioning to Sample Alignment","SampleAlignment")
|
|
|
|
def cb_esc_data_collection(self):
|
|
app=QApplication.instance()
|
|
steps = [
|
|
# lambda: sample_selection.tell.set_current(30.0),
|
|
lambda: app._backlight.move("out"),
|
|
lambda: self.move_post_tube("in"),
|
|
lambda: self.move_collimator("in"),
|
|
lambda:self.move_detector("in"),
|
|
]
|
|
#if option(CRYOJET_MOTION_ENABLED):
|
|
# steps.extend([lambda: self.move_cryojet_nozzle("in")])
|
|
self.esc_run_steps(steps, "Transitioning to Data Collection","DataCollection")
|
|
|
|
def cb_really_quit(self):
|
|
"""called when user Ctrl-Q the app"""
|
|
if QMessageBox.question(self, "", "Are you sure you want to quit?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No,) == QMessageBox.Yes:
|
|
self._do_quit = True
|
|
self.close()
|
|
|
|
def cb_deltatau_home_faststages(self):
|
|
pfx=QApplication.instance()._cfg.value(AppCfg.GBL_DEV_PREFIX)[0]
|
|
_log.info("homing fast stages")
|
|
epics.PV(f"{pfx}1:ASYN.AOUT").put(b"enable plc 1")
|
|
|
|
def cb_about(self):
|
|
try:
|
|
ver,gitcmt=get_version()
|
|
v_SMX=f'{ver} git:{gitcmt}'
|
|
except:
|
|
v_SMX='git version failed'
|
|
try:
|
|
ver,gitcmt=get_version('..')
|
|
v_SMXT=f'{ver} git:{gitcmt}'
|
|
except:
|
|
v_SMXT='git version failed'
|
|
|
|
txt=f'''About Swissmx:
|
|
|
|
SwissMX: {v_SMX}
|
|
SwissMXTools: {v_SMXT}
|
|
|
|
qt:{QT_VERSION_STR}
|
|
pyqtgraph:{pg.__version__}
|
|
numpy:{np.__version__}
|
|
matplotlib:{mpl.__version__}
|
|
epics:{epics.__version__}
|
|
|
|
Copyright (c) 2022 by Paul Scherrer Institute
|
|
(http://www.psi.ch)
|
|
|
|
Based on Zac great first implementation
|
|
Author Thierry Zamofing (thierry.zamofing@psi.ch)
|
|
'''
|
|
|
|
QMessageBox.about(self, "SwissMX", txt)
|
|
pass
|
|
|
|
def cb_autofocus(self):
|
|
app=QApplication.instance()
|
|
cfg=app._cfg.value(AppCfg.GEO_AUTOFOC)
|
|
geo=app._geometry
|
|
#geo.autofocus(app._camera, self.tweakers['base_z'],rng=(-1, 1), n=30,saveImg=True)
|
|
|
|
af_range=cfg['range']
|
|
n=cfg['steps']
|
|
vel=cfg['velocity']
|
|
rng=(-af_range/2, af_range/2)
|
|
#geo.autofocus(app._camera, self.tweakers['base_z'],rng, n,dlg)
|
|
af=geometry.autofocus(app._camera, self.tweakers['base_z'])
|
|
# af.run(rng, n, dlg)
|
|
#with pg.ProgressDialog('Progress', 0, n) as dlg:
|
|
af.run_continous(rng=rng, velo=vel)
|
|
|
|
def cb_find_fiducial(self):
|
|
app=QApplication.instance()
|
|
geo=app._geometry
|
|
cfg=app._cfg.value(AppCfg.GEO_FND_FID)
|
|
|
|
scl=np.linalg.norm(geo.pos2pix((.001,0))) #pix/um
|
|
sz=int(cfg['sz']*scl)
|
|
brd=int(cfg['brd']*scl)
|
|
# sz=( 84, 84); brd=( 7, 7) # zoom 001
|
|
# sz=(128, 128); brd=( 9, 9) # zoom 200
|
|
# sz=(200, 200); brd=(13, 13) # zoom 400
|
|
_log.debug(f'sz:{sz} brd:{brd}')
|
|
pos,corr=geo.find_fiducial(app._camera, sz=(sz,sz),brd=(brd,brd))
|
|
bz=self.tweakers["base_z"].get_rbv()
|
|
pos_scn=self._goImg.mapToScene(pg.Point(pos))
|
|
pos_eu=self._goTracked.mapFromScene(pos_scn)
|
|
|
|
l=.120
|
|
#go=UsrGO.Fiducial((fx-l/2, fy-l/2), (l, l), bz)
|
|
go=UsrGO.Fiducial((pos_eu.x()-l/2, pos_eu.y()-l/2), (l, l), bz)
|
|
go.sigRegionChangeFinished.connect(self.cb_fiducial_update_z)
|
|
grp=self._goTracked
|
|
grp.addItem(go)
|
|
self._moduleFixTarget._tree.setData(grp.childItems())
|
|
pass
|
|
|
|
def cb_testcode(self):
|
|
app=QApplication.instance()
|
|
cfg=app._cfg
|
|
geo=app._geometry
|
|
try:
|
|
tc=self._testCode
|
|
tc['idx']+=1
|
|
except AttributeError:
|
|
self._testCode=tc={'idx':0}
|
|
step=tc['idx']
|
|
|
|
def testMatplotlib():
|
|
import matplotlib.pyplot as plt
|
|
x=np.linspace(0.1, 2*np.pi, 41)
|
|
y=np.exp(np.sin(x))
|
|
plt.stem(x, y)
|
|
plt.show(block=False)
|
|
|
|
step=6
|
|
if step==0:
|
|
vb=self.vb
|
|
vb.autoRange(items=(self._goImg,))
|
|
elif step==1:
|
|
testMatplotlib()
|
|
elif step==2:
|
|
vb=self.vb
|
|
grp=pg.ItemGroup()
|
|
vb.addItem(grp)
|
|
obj=UsrGO.Marker((100, 100), (100, 100), mode=1)
|
|
grp.addItem(obj)
|
|
obj=UsrGO.Marker((150, 100), (50, 50), mode=1)
|
|
grp.addItem(obj)
|
|
obj=UsrGO.Marker((200, 100), (100, 100), mode=1)
|
|
grp.addItem(obj)
|
|
tc['grp']=grp
|
|
vb.autoRange(items=(obj,))
|
|
elif step==3:
|
|
grp=tc['grp']
|
|
tr=grp.transform()
|
|
# UsrGO.obj_info(tr)
|
|
tr.setMatrix(1, .2, 0,
|
|
-.2, 1, 0,
|
|
0, 0, 1)
|
|
grp.setTransform(tr)
|
|
elif step==4:
|
|
geo.autofocus(app._camera, self.tweakers['base_z'],rng=(-1, 1), n=30,saveImg=True)
|
|
elif step==5:
|
|
geo.autofocus(app._camera, self.tweakers['base_z'],rng=(-1, 1), n=10)
|
|
elif step==6:
|
|
geo.find_fiducial(app._camera)
|
|
#print(vb.childGroup.childItems())
|
|
pass
|
|
|
|
def prepare_wd_tabs_left(self):
|
|
|
|
|
|
tabs = self._wd_tabs_left
|
|
tabs.currentChanged.connect(self.cb_switch_task)
|
|
|
|
setup_tab = self._tab_setup
|
|
exp_tab = self._tab_experiment
|
|
|
|
# add layouts
|
|
#exp_tab.setLayout(QVBoxLayout())
|
|
setup_tab.setLayout(QVBoxLayout())
|
|
setup_tab.setSizePolicy(QSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding))
|
|
self._setup_toolbox = tbox = QToolBox()
|
|
tbox.setSizePolicy(QSizePolicy(QSizePolicy.Expanding, QSizePolicy.Ignored))
|
|
# tbox.setMinimumWidth(itemWidget.sizeHint().width())
|
|
|
|
setup_tab.layout().addWidget(tbox)
|
|
|
|
# -------- DAQ Methods Tabs --------
|
|
self.build_tab_module_fix_target()
|
|
#self._OLD_build_daq_methods_grid_tab()
|
|
#self._OLD_build_daq_methods_prelocated_tab() #ZAC: orig. code
|
|
#self._OLD_create_helical_widgets() #ZAC: orig. code
|
|
#self._OLD_build_daq_methods_embl_tab()
|
|
#self._OLD_build_sample_selection_tab()
|
|
|
|
# -------- group geometry calibration --------
|
|
block = QWidget()
|
|
block.setAccessibleName(TASK_SETUP_GEOMETRY_CALIB)
|
|
block.setContentsMargins(0, 0, 0, 0)
|
|
block.setLayout(QVBoxLayout())
|
|
grp = QWidget()
|
|
block.layout().addWidget(grp)
|
|
block.layout().addStretch()
|
|
grp.setLayout(QGridLayout())
|
|
tbox.addItem(block, "geometry calibration")
|
|
#grp.layout().addWidget(QLabel("feature size (µM)"), 0, 0)
|
|
#self._ppm_feature_size_spinbox = QDoubleSpinBox()
|
|
#self._ppm_feature_size_spinbox.setRange(5, 10000)
|
|
#self._ppm_feature_size_spinbox.setValue(50)
|
|
#self._ppm_feature_size_spinbox.setDecimals(0)
|
|
#self._ppm_feature_size_spinbox.setSuffix(" µM")
|
|
#grp.layout().addWidget(self._ppm_feature_size_spinbox, 0, 1)
|
|
self._btn_pix2pos = but = QPushButton("Pixel to position calibration")
|
|
but.setCheckable(True)
|
|
but.clicked.connect(self.cb_update_pix2pos)
|
|
grp.layout().addWidget(but, 0, 0)
|
|
self._btn_opt_ctr = but = QPushButton("Optical center calibration")
|
|
but.setCheckable(True)
|
|
but.clicked.connect(self.cb_update_opt_ctr)
|
|
grp.layout().addWidget(but, 1, 0)
|
|
|
|
self._wd_calib_tree=tree=pg.DataTreeWidget(data=None)
|
|
grp.layout().addWidget(tree, 2, 0)
|
|
|
|
tbox.currentChanged.connect(self.cb_switch_task)
|
|
|
|
# final stretch
|
|
# setup_tab.layout().addStretch()
|
|
exp_tab.layout().addStretch()
|
|
|
|
# Jungfrau parameters
|
|
block=QWidget()
|
|
block.setAccessibleName(TASK_JUNGFRAU_SETTINGS)
|
|
block.setContentsMargins(0, 0, 0, 0)
|
|
block.setLayout(QVBoxLayout())
|
|
# self.jungfrau = jungfrau_widget.JungfrauWidget() #ZAC: orig. code
|
|
# block.layout().addWidget(self.jungfrau) #ZAC: orig. code
|
|
block.layout().addStretch()
|
|
tbox.addItem(block, "Jungfrau Parameters")
|
|
|
|
# # -------- group beam box --------
|
|
# block = QWidget()
|
|
# block.setAccessibleName(TASK_SETUP_BEAM_CENTER)
|
|
# block.setContentsMargins(0, 0, 0, 0)
|
|
# block.setLayout(QVBoxLayout())
|
|
# grp = QWidget()
|
|
# block.layout().addWidget(grp)
|
|
# block.layout().addStretch()
|
|
# grp.setLayout(QGridLayout())
|
|
# tbox.addItem(block, "Beam Box")
|
|
# # setup_tab.layout().addWidget(grp)
|
|
#
|
|
# row = grp.layout().rowCount
|
|
#
|
|
# if self._pv_shutter is not None:
|
|
# but = QPushButton("open/close shutter")
|
|
# but.setCheckable(True)
|
|
# but.setChecked(1 == self._pv_shutter.get())
|
|
# but.clicked.connect(self.toggle_shutter)
|
|
# self._button_shutter = but
|
|
# grp.layout().addWidget(but, 0, 0, 1, 2)
|
|
# self._pv_shutter.add_callback(self.update_shutter_label)
|
|
# #else: #ZAC: orig. code
|
|
# # self._picker = bernina_pulse_picker.PickerWidget()
|
|
# # grp.layout().addWidget(self._picker, 0, 0, 1, 2)
|
|
# but = QPushButton("clear beam markers")
|
|
# but.clicked.connect(self.remove_beam_markers)
|
|
# grp.layout().addWidget(but, 1, 1)
|
|
# help = QTextBrowser()
|
|
# grp.layout().addWidget(help, 2, 0, 5, 2)
|
|
# help.setHtml(
|
|
# """<body>
|
|
# <h1>Updating Beam Mark</h1>
|
|
# <ol>
|
|
# <li>clear beam markers</li>
|
|
# <li>for each zoom level, CTRL-Click (left mouse button) the center of the beam</li>
|
|
# </ol>
|
|
# </body>
|
|
# """
|
|
# )
|
|
|
|
# # -------- group regions --------
|
|
# block = QWidget()
|
|
# block.setAccessibleName(TASK_SETUP_ROI)
|
|
# block.setContentsMargins(0, 0, 0, 0)
|
|
# block.setLayout(QVBoxLayout())
|
|
# grp = QWidget()
|
|
# block.layout().addWidget(grp)
|
|
# block.layout().addStretch()
|
|
# grp.setLayout(QGridLayout())
|
|
# tbox.addItem(block, "Regions of Interest")
|
|
# but = QPushButton("add RECT roi")
|
|
# but.clicked.connect(self.roi_add_rect)
|
|
# grp.layout().addWidget(but, 0, 0)
|
|
# but = QPushButton("add LINE roi")
|
|
# but.clicked.connect(self.roi_add_line)
|
|
# grp.layout().addWidget(but, 0, 1)
|
|
#
|
|
# # -------- group camera settings --------
|
|
# block = QWidget()
|
|
# block.setAccessibleName(TASK_SETUP_CAMERA)
|
|
# block.setContentsMargins(0, 0, 0, 0)
|
|
# block.setLayout(QVBoxLayout())
|
|
# grp = QWidget()
|
|
# block.layout().addWidget(grp)
|
|
# block.layout().addStretch()
|
|
# grp.setLayout(QGridLayout())
|
|
# tbox.addItem(block, "Camera Settings")
|
|
# but = QPushButton("remove transformations")
|
|
# but.clicked.connect(lambda bogus: self.modify_camera_transform("remove_all"))
|
|
# grp.layout().addWidget(but, 0, 0)
|
|
# but = QPushButton("remove last")
|
|
# but.clicked.connect(lambda bogus: self.modify_camera_transform("undo_last"))
|
|
# grp.layout().addWidget(but, 0, 1)
|
|
# but = QPushButton("Turn 90 CCW")
|
|
# #but.clicked.connect(
|
|
# # lambda bogus, n=camera.Transforms.ROTATE_90: self.modify_camera_transform(n)
|
|
# #) #ZAC: orig. code
|
|
# grp.layout().addWidget(but, 1, 0)
|
|
# but = QPushButton("Turn 90 CW")
|
|
# #but.clicked.connect(lambda bogus, n=camera.Transforms.ROTATE_270: self.modify_camera_transform(n)) #ZAC: orig. code
|
|
# grp.layout().addWidget(but)
|
|
# but = QPushButton("Turn 180 CCW")
|
|
# #but.clicked.connect(lambda bogus, n=camera.Transforms.ROTATE_180: self.modify_camera_transform(n)) #ZAC: orig. code
|
|
# grp.layout().addWidget(but)
|
|
# but = QPushButton("Transpose")
|
|
# #but.clicked.connect(lambda bogus, n=camera.Transforms.TRANSPOSE: self.modify_camera_transform(n)) #ZAC: orig. code
|
|
# grp.layout().addWidget(but)
|
|
# but = QPushButton("Flip L/R")
|
|
# #but.clicked.connect(lambda bogus, n=camera.Transforms.FLIP_LR: self.modify_camera_transform(n)) #ZAC: orig. code
|
|
# grp.layout().addWidget(but)
|
|
# but = QPushButton("Flip U/D")
|
|
# #but.clicked.connect(lambda bogus, n=camera.Transforms.FLIP_UD: self.modify_camera_transform(n)) #ZAC: orig. code
|
|
# grp.layout().addWidget(but)
|
|
# row = grp.layout().rowCount()
|
|
# self._label_transforms = QLabel()
|
|
# self._label_transforms.setWordWrap(True)
|
|
# self.modify_camera_transform(None)
|
|
#
|
|
# grp.layout().addWidget(self._label_transforms, row, 0, 1, 2)
|
|
|
|
def build_tab_module_fix_target(self):
|
|
#tab = self._tab_daq_method_prelocated
|
|
try:
|
|
objLst=self._goTracked.childItems()
|
|
except AttributeError:
|
|
_log.debug('NOT YET INITIALIZED')
|
|
objLst=None
|
|
|
|
self._moduleFixTarget =mft = ModuleFixTarget.WndFixTarget(self,objLst)
|
|
tab = self._tabs_daq_methods.insertTab(0,mft,'Fix Target')
|
|
mft.setAccessibleName(TASK_FIX_TARGET)
|
|
self._tabs_daq_methods.setCurrentWidget(mft) #set this as the active tabs
|
|
|
|
mft._cbType.addItems(["Fiducial", "FixTarget(12.5x12.5)", "FixTarget(23.0x23.0)", "FixTarget(test)", "Grid()", "SwissMX()", "SwissFEL()"])
|
|
#print(psi_device.shapepath.ShapePath.setup_motion.__doc__)
|
|
mft._txtParam.setToolTip('''\
|
|
additional parameter as yaml for object settings and motion generation:
|
|
full example for grid() object and motion settings:
|
|
size:[3.6,2.664],cnt:[30,22],fiducialSize:0.05,mode:6,scale:.5,ssz:[7,4],smv:[6,4],sdelay:9
|
|
further examples:
|
|
object settings:
|
|
for Grid(): size:[4,3],cnt:[30,22],fiducialSize:0.1
|
|
motion param:
|
|
for continous: cnt:2,scale:.3,dwell:100
|
|
for stop-and-go: tmove:10,twait:20
|
|
for hit-and-return: ssz:(4,6)
|
|
|
|
motion generation:
|
|
For details read: psi_device.shapepath.ShapePath.setup_motion.__doc__
|
|
mode: select one of following modes: 1,3,4,5,6
|
|
mode settings:
|
|
all: cnt:1
|
|
scale:1.
|
|
dwell:10
|
|
5: tmove:10
|
|
twait:20
|
|
6: ssz:(7,4)
|
|
smv:(6,4) (delault defined)
|
|
sdelay:9 (delault defined)
|
|
|
|
object settings:
|
|
Fiducial:
|
|
no param
|
|
FixTarget(12.5x12.5), FixTarget(23.0x23.0), FixTarget(test):
|
|
no param
|
|
Grid():
|
|
size:[30,20]
|
|
cnt:[30,22]
|
|
fiducialSize:0.1
|
|
SwissMX():
|
|
ofs:[.2,.2]
|
|
width:10
|
|
fidScl:.02
|
|
fiducial:[[.1,.1],[.1,2.7],[10.3,.1],[10.3, 2.7]]
|
|
SwissFEL():
|
|
ofs:[.2,.2]
|
|
width:10
|
|
fidScl:.02
|
|
fiducial:[[.1,.1],[.1,2.2],[10.3,.1],[10.3,2.2]]
|
|
''')
|
|
|
|
mft._btnAdd.clicked.connect(self.module_fix_target_add_obj)
|
|
mft._btnDelAll.clicked.connect(self.module_fix_target_del_all_obj)
|
|
mft._btnFit.clicked.connect(self.module_fix_target_fit_fiducial)
|
|
|
|
#tab.layout().addWidget(mft)
|
|
mft.prefixSelected.connect(lambda prefix: self._le_prefix.setText(prefix))
|
|
#mft.dataFileLoaded.connect(self._OLD_daq_method_prelocated_update_markers)
|
|
#mft.prelocatedDataUpdated.connect(self._OLD_daq_method_prelocated_update_markers)
|
|
#mft.markersDeleted.connect(self._OLD_daq_method_prelocated_remove_markers)
|
|
#mft.moveFastStageRequest.connect(self._OLD_move_fast_stage)
|
|
#self.fiducialPositionSelected.connect(self._OLD_daq_method_prelocated_set_fiducial)
|
|
#self.appendPrelocatedPosition.connect(self._OLD_daq_method_prelocated_append_data)
|
|
#self._preloc_inspect_area = QPlainTextEdit()
|
|
#tab.layout().addWidget(self._preloc_inspect_area)
|
|
|
|
|
|
def cb_switch_task(self, index=0):
|
|
stack = self._wd_stack
|
|
task = self._wd_tabs_left.currentWidget().accessibleName()
|
|
setup_task = self._setup_toolbox.currentWidget().accessibleName()
|
|
method = self._tabs_daq_methods.currentWidget().accessibleName()
|
|
|
|
_log.debug("Task switching: top_tabs={} exp.method={} setup_task={}".format(task, method, setup_task))
|
|
|
|
if task == "task_experiment":
|
|
stack.setCurrentIndex(0)
|
|
active_task = method
|
|
elif task == "task_setup":
|
|
active_task = setup_task
|
|
|
|
elif task == "task_sample_selection":
|
|
active_task = TASK_SAMPLE_SELECTION
|
|
stack.setCurrentIndex(1)
|
|
else:
|
|
_log.error("BUG: un-handled task")
|
|
QMessageBox.warning(self, "BUG unhandled task!", "unhandled task: top_tabs={} method={}".format(task, method),)
|
|
self.set_active_task(active_task)
|
|
self._status_task.setText(active_task)
|
|
|
|
def cb_update_pix2pos(self, calib):
|
|
app=QApplication.instance()
|
|
if calib:
|
|
_log.info("received new pix2pos calibration")
|
|
QMessageBox.information(self,'Pixel to position calibration','''\
|
|
- for each zoom level:
|
|
- choose a good fiducial mark
|
|
- move the stage x/y at min. 3 different locations
|
|
- click (left mouse button) on that fiducial mark
|
|
- un-click button "Pixel to position calibration"
|
|
''')
|
|
app._raw_pix2pos=dict()
|
|
else:
|
|
s=str(app._raw_pix2pos).replace('),','),\n ').replace('],','],\n')
|
|
print(s)
|
|
res=QMessageBox.question(self, "calibration", f"use calibration\n{s} ? ")
|
|
if res==QMessageBox.Yes:
|
|
geo=app._geometry
|
|
geo.update_pix2pos(app._raw_pix2pos)
|
|
app._cfg.setValue(AppCfg.GEO_PIX2POS, geo._lut_pix2pos)
|
|
del app._raw_pix2pos
|
|
|
|
def cb_update_opt_ctr(self, calib):
|
|
app=QApplication.instance()
|
|
if calib:
|
|
_log.info("received new opt_ctr calibration")
|
|
QMessageBox.information(self,'Optical center calibration','''\
|
|
- choose the biggest zoom level to start
|
|
- choose at least 3 fiducial marks
|
|
- for at least two zoom level:
|
|
- Click (left mouse button) on the fiducial marks (always in the same order)
|
|
- un-click button "Optical center calibration"
|
|
''')
|
|
|
|
app._raw_opt_ctr=dict()
|
|
else:
|
|
geo=app._geometry
|
|
ocOld=geo._opt_ctr
|
|
try:
|
|
geo.update_optical_center(app._raw_opt_ctr,True)
|
|
except BaseException as e:
|
|
_log.warning(e)
|
|
QMessageBox.warning(self, "calibration", f"rejected\n{e}")
|
|
del app._raw_opt_ctr
|
|
return
|
|
del app._raw_opt_ctr
|
|
go=self._goOptCtr
|
|
oc_sz=go.size()
|
|
go.blockSignals(True) # avoid to call cb_marker_moved
|
|
go.setPos(geo._opt_ctr-oc_sz/2)
|
|
res=QMessageBox.question(self, "calibration", f"keep new optical center\n{geo._opt_ctr} ? ")
|
|
if res==QMessageBox.Yes:
|
|
app._cfg.setValue(AppCfg.GEO_OPT_CTR, geo._opt_ctr)
|
|
else:
|
|
geo._opt_ctr=ocOld
|
|
go.setPos(geo._opt_ctr-oc_sz/2)
|
|
go.blockSignals(False) # allow to call cb_marker_moved
|
|
|
|
def cb_fiducial_update_z(self,obj,*vargs):
|
|
obj._z=self.tweakers["base_z"].get_rbv()
|
|
|
|
def module_fix_target_add_obj(self,*args,**kwargs):
|
|
mft=self._moduleFixTarget
|
|
fx=self.tweakers["fast_x"].get_rbv()
|
|
fy=self.tweakers["fast_y"].get_rbv()
|
|
bz=self.tweakers["base_z"].get_rbv()
|
|
app=QApplication.instance()
|
|
geo=app._geometry
|
|
oc=geo._opt_ctr
|
|
bm_pos=self._goBeamMarker.pos()
|
|
bm_sz=self._goBeamMarker.size()
|
|
idx=mft._cbType.currentIndex()
|
|
param=mft.get_param()
|
|
if idx==0:
|
|
#go=UsrGO.Fiducial(bm_pos+bm_sz/2-(20, 20), (40, 40),(fx,fy,bz))
|
|
l=.120
|
|
go=UsrGO.Fiducial((fx-l/2,fy-l/2), (l, l),bz)
|
|
go.sigRegionChangeFinished.connect(self.cb_fiducial_update_z)
|
|
elif idx==1:
|
|
#v=geo.pos2pix((12.5, 0));l=np.linalg.norm(v);l=12.5
|
|
sz=param.pop('size',(12.5, 12.5))
|
|
go=UsrGO.FixTargetFrame((fx-sz[0]/2,fy-sz[1]/2), sz, tpl='12.5x12.5',**param)
|
|
elif idx==2:
|
|
#v=geo.pos2pix((23, 0));l=np.linalg.norm(v)#l=23
|
|
sz=param.pop('size',(23, 23))
|
|
go=UsrGO.FixTargetFrame((fx-sz[0]/2,fy-sz[1]/2), sz, tpl='23.0x23.0',**param)
|
|
elif idx==3:
|
|
sz=param.pop('size',(.120*12, .120*8))
|
|
go=UsrGO.FixTargetFrame((fx-sz[0]/2,fy-sz[1]/2), sz, tpl='test',**param)
|
|
elif idx==4:
|
|
w,h=size=param.pop('size',(30, 20))
|
|
cnt=param.pop('cnt',(30, 22))
|
|
fiducialSize=param.pop('fiducialSize',.1)
|
|
go=UsrGO.Grid((fx-w/2,fy-h/2), size, cnt, fiducialSize,**param)
|
|
elif idx==5:
|
|
ofs=param.pop('ofs',(.2, .2))
|
|
width=param.pop('width',10)
|
|
fidScl=param.pop('fidScl',.02)
|
|
fiducial=param.pop('fiducial',((.1, .1), (.1, 2.7), (10.3, .1), (10.3, 2.7)))
|
|
fiducial=np.array(fiducial)
|
|
gp=psi_device.shapepath.GenPath(); gp.swissmx(ofs=ofs, width=width, flipy=True)
|
|
sz=gp.points.max(0)+np.array(ofs)*2
|
|
go=UsrGO.Path((fx-sz[0]/2, fy-sz[1]/2), sz, gp.points, fiducial, fidScl,**param)
|
|
elif idx==6:
|
|
ofs=param.pop('ofs',(.2, .2))
|
|
width=param.pop('width',10)
|
|
fidScl=param.pop('fidScl',.02)
|
|
fiducial=param.pop('fiducial',((.1, .1), (.1, 2.2), (10.3, .1), (10.3, 2.2)))
|
|
fiducial=np.array(fiducial)
|
|
gp=psi_device.shapepath.GenPath(); gp.swissfel(ofs=ofs, width=width,flipy=True)
|
|
sz=gp.points.max(0)+np.array(ofs)*2
|
|
go=UsrGO.Path((fx-sz[0]/2, fy-sz[1]/2), sz, gp.points, fiducial, fidScl,**param)
|
|
else:
|
|
_log.error('index not handeled')
|
|
|
|
grp=self._goTracked
|
|
grp.addItem(go)
|
|
mft._tree.setData(grp.childItems())
|
|
|
|
def module_fix_target_del_all_obj(self):
|
|
app=QApplication.instance()
|
|
geo=app._geometry
|
|
try: #remove plane fitter if it esists
|
|
del geo._fitPlane
|
|
except AttributeError:
|
|
pass
|
|
mft=self._moduleFixTarget
|
|
vb=self.vb
|
|
grp=self._goTracked
|
|
for go in grp.childItems():
|
|
vb.removeItem(go)
|
|
grp.setFlag(grp.ItemHasNoContents)
|
|
mft._tree.setData(None)
|
|
|
|
def module_fix_target_fit_fiducial(self):
|
|
mft=self._moduleFixTarget
|
|
vb=self.vb
|
|
grp=self._goTracked
|
|
cldLst=grp.childItems()
|
|
n=0
|
|
for go in cldLst:
|
|
if type(go)==UsrGO.Fiducial:
|
|
n+=1
|
|
ptFitTrf=np.ndarray((4,2)) # 4 (x,y) points to fit a transformation of a parallelogram
|
|
ptFitPlane=np.ndarray((n,3)) #n (x,y,z) points to fit a plane transformation z=ax+by+c
|
|
i=j=0
|
|
for go in reversed(cldLst):
|
|
if type(go)==UsrGO.FixTargetFrame:
|
|
if j==4:
|
|
#fid=np.array(((0.1, 0.1), (0.1, 1.1), (1.1, 0.1), (1.1, 1.1)))
|
|
fid=np.array(go._dscr['fiducial']['pos'])
|
|
sz=np.array(go._dscr['size'])
|
|
fid=fid/sz
|
|
trf=geometry.geometry.least_square_trf(ptFitTrf,fid)
|
|
#_log.debug(f'{trf}')
|
|
_log.debug('\n trf='+str(trf).replace('\n','\n '))
|
|
tr=go.transform()
|
|
tr.setMatrix(1, trf[1,0]/trf[0,0], 0,
|
|
trf[0,1]/trf[1,1], 1, 0,
|
|
0, 0, 1)
|
|
go.setTransform(tr)
|
|
go.setPos(trf[:,2])
|
|
#go.setSize((1,1))
|
|
sz=(trf[0,0],trf[1,1])
|
|
go.setSize(sz)
|
|
j=0
|
|
elif type(go)==UsrGO.Path:
|
|
if j==4:
|
|
fid=go._fiducial
|
|
sz=go.szOrig
|
|
fid=fid/sz
|
|
trf=geometry.geometry.least_square_trf(ptFitTrf, fid)
|
|
_log.debug('\n trf='+str(trf).replace('\n','\n '))
|
|
tr=go.transform()
|
|
tr.setMatrix(1, trf[1, 0]/trf[0, 0], 0,
|
|
trf[0, 1]/trf[1, 1], 1, 0,
|
|
0, 0, 1)
|
|
go.setTransform(tr)
|
|
go.setPos(trf[:, 2])
|
|
# go.setSize((1,1))
|
|
sz=(trf[0, 0], trf[1, 1])
|
|
go.setSize(sz)
|
|
j=0
|
|
elif type(go)==UsrGO.Fiducial and go.size()[0]==0.12:
|
|
if j<ptFitTrf.shape[0]:
|
|
ptFitTrf[j]=go.pos()+go.size()/2
|
|
j+=1
|
|
ptFitPlane[i]=go.ctr()
|
|
i+=1
|
|
app=QApplication.instance()
|
|
geo=app._geometry
|
|
if n>=3:
|
|
geo.least_square_plane(ptFitPlane)
|
|
|
|
def cb_progress(self,i,sz,dlg):
|
|
dlg.setValue(int(i*90/sz))
|
|
if dlg.wasCanceled():
|
|
raise AttributeError('canceled')
|
|
|
|
def daq_collect(self, **kwargs):# points, visualizer_method, visualizer_params):
|
|
'''
|
|
kwargs:
|
|
grid: grid dictionary with orig, pitch, count
|
|
trf: transformation matrix
|
|
points: list of points without transformation
|
|
pts_trf: list of points with transformation
|
|
'''
|
|
app = QApplication.instance()
|
|
cfg = app._cfg
|
|
dt_misc = cfg.value(AppCfg.DT_MISC)
|
|
|
|
#_log.critical('abort for test');return # RET
|
|
geo = app._geometry
|
|
shutter=app._shutter
|
|
fn='/tmp/shapepath'
|
|
try:
|
|
dt=app._deltatau
|
|
except AttributeError:
|
|
app._deltatau=dt=psi_device.Deltatau(app._args.sim&0x100!=0)
|
|
try:
|
|
jf=app._jungfrau
|
|
except AttributeError:
|
|
app._jungfrau=jf=psi_device.Jungfrau(app._args.sim&0x80!=0)
|
|
jf.config(**kwargs)
|
|
sp=dt._shapepath
|
|
sp.verbose=dt_misc['verbose']
|
|
|
|
sp.meta['fel_per']=dt_misc['fel_per']
|
|
sp.meta['sync_mode']=dt_misc['sync_mode']
|
|
sp.meta['sync_flag']=dt_misc['sync_flag']
|
|
|
|
#pv-monitor-func: stop monitors
|
|
if not cfg.value(AppCfg.GBL_MISC)['live_on_collect']:
|
|
pv_mon_lst=app._pv_mon_lst
|
|
for pv in pv_mon_lst:
|
|
pv.auto_monitor=False
|
|
|
|
with pg.ProgressDialog('Progress', 0, 100) as dlg:
|
|
dlg.setWindowModality(Qt.WindowModal)
|
|
#dlg.setRange(0, 0)
|
|
#dlg.setCancelButtonText("Abort Acquisition")
|
|
#dlg.canceled.connect(self.complete_daq)
|
|
#dlg.setAutoClose(True)
|
|
#dlg.show()
|
|
dlg.setLabelText("Setup Gather/Sync");dlg+=5
|
|
mode=kwargs.pop('mode',1)
|
|
sp.setup_sync(verbose=sp.verbose&0x40, timeOfs=dt_misc['time_ofs'], timeCor=dt_misc['time_cor'])
|
|
dlg.setLabelText("Download motion program");dlg+=5
|
|
|
|
use_trf=kwargs.get('use_trf', True) # do not use coordinate transformation
|
|
if not use_trf:
|
|
trf=kwargs.pop('trf')
|
|
try:
|
|
sp.setup_motion(fnPrg=fn+'.prg', mode=mode, **kwargs)
|
|
except BaseException as e:
|
|
_log.error(repr(e));return
|
|
sp.setup_gather()
|
|
if not use_trf:
|
|
kwargs['trf']=trf
|
|
try:
|
|
p=geo._fitPlane
|
|
#TODO: cleanup
|
|
if use_trf:
|
|
trf=kwargs['trf'] # grid-coord -> motor-um
|
|
#(0,0,1)*trf
|
|
# trf*'gridpos in um' -> motor pos in mm
|
|
trf2=np.asmatrix(np.identity(3))
|
|
trf2[:, :2]=trf
|
|
#p1=(0,0,1)*trf2 # =matrix([[-2376.8, 1376.8, 1. ]]) um
|
|
trf3=np.matrix( ((p[0],0,0),(p[1],0,0),(1000*p[2],0,1)) )
|
|
#(0, 0, 1)*trf2*trf3 -> z in um of gridpos(0,0)
|
|
t=(trf2*trf3)[:,0].A.ravel()
|
|
cz=f'{t[0]:+.18g}X{t[1]:+.18g}Y{t[2]:+.18g}'
|
|
else:
|
|
# X has inverted sign !
|
|
# Z is in um -> therefore the offset must be multiplied with 1000 ! Z motor has opposite sign !
|
|
#cz=f'{+p[0]:+.18g}X{-p[1]:+.18g}Y{-p[2]*1000:+.18g}'
|
|
t=p*np.array((1,1,1000))
|
|
cz=f'{t[0]:+.18g}X{t[1]:+.18g}Y{t[2]:+.18g}'
|
|
except AttributeError:
|
|
cz='0'
|
|
_log.warning('no plane fitting done. z does not move')
|
|
|
|
trf=kwargs['trf']
|
|
if use_trf:
|
|
fx=f'{trf[0, 0]:+.18g}X{trf[1, 0]:+.18g}Y{trf[2, 0]:+.18g}'
|
|
fy=f'{trf[0, 1]:+.18g}X{trf[1, 1]:+.18g}Y{trf[2, 1]:+.18g}'
|
|
else:
|
|
fx='X';fy='Y'
|
|
if _log.level==logging.DEBUG:
|
|
try: t
|
|
except NameError: t=(np.nan,np.nan,np.nan)
|
|
fn='/tmp/coord.log'
|
|
_log.debug(f'write all coordinates to {fn}')
|
|
if use_trf:
|
|
xy1=np.hstack((kwargs["points"],np.ones((kwargs["points"].shape[0],1))))
|
|
else:
|
|
xy1=np.hstack((kwargs["pts_trf"],np.ones((kwargs["pts_trf"].shape[0],1))))
|
|
pcz=np.matrix(((t[0],t[1],t[2])))*xy1.T
|
|
x_y_fx_fy_cz=np.vstack((kwargs["points"].T, kwargs["pts_trf"].T, pcz)).A
|
|
with open('/tmp/coord.log','w') as fh:
|
|
fh.write(' x y fx fy cz\n')
|
|
for i in range(x_y_fx_fy_cz.shape[1]):
|
|
fh.write('%9.7g %9.7g %9.7g %9.7g %9.7g\n'%tuple(x_y_fx_fy_cz[:,i]))
|
|
|
|
sp.setup_coord_trf(fx, fy, cz) # reset to shape path system
|
|
|
|
comm=sp.comm
|
|
if comm is None:
|
|
_log.info('simulated')
|
|
else:
|
|
comm.gpascii._cb_func=lambda i, sz:self.cb_progress(i, sz, dlg)
|
|
#_log.critical('TEMPORARY DO NOT EXECUTE PROGRAM !!!');return
|
|
if dlg.wasCanceled():
|
|
# pv-monitor-func: start monitors
|
|
if not cfg.value(AppCfg.GBL_MISC)['live_on_collect']:
|
|
for pv in pv_mon_lst:
|
|
pv.auto_monitor=True
|
|
return
|
|
dlg.setLabelText("Homing and get ready");dlg+=5
|
|
sp.homing() # homing if needed
|
|
sp.run() # start motion program
|
|
sp.wait_armed() # wait until motors are at first position
|
|
shutter.open()
|
|
time.sleep(1.1)
|
|
num_pts=kwargs['num_pts']
|
|
jf.acquire(num_pts)
|
|
sp.trigger(1.0) # send a start trigger (if needed) after given time
|
|
_log.info('start trigger sent')
|
|
if dt._comm is None:
|
|
dlg.setLabelText("run motion/acquisition (simulated)")
|
|
dlg.setMaximum(num_pts)
|
|
for p in range(0,num_pts,1+int(num_pts/100)):
|
|
#_log.info(f'progress {p}/{num_pts}')
|
|
dlg.setValue(p)
|
|
time.sleep(.1)
|
|
else:
|
|
dlg.setLabelText("run motion/acquisition")
|
|
dlg.setMaximum(num_pts)
|
|
while True:
|
|
p=int(sp.progress())
|
|
_log.info(f'progress {p}/{num_pts}')
|
|
if p<0:
|
|
break
|
|
elif dlg.wasCanceled():
|
|
dt._comm.gpascii.send_block('&1a;Gather.Enable=0')
|
|
break
|
|
#_log.info(f'progress {p}/{sp.points.shape[0]}')
|
|
dlg.setValue(max(p,num_pts))
|
|
time.sleep(1.) #wait 1 sec instead of .1...hopefully less segmentation faults
|
|
if not dlg.wasCanceled():
|
|
jf.gather_upload()
|
|
dlg.setLabelText("upload gather data")
|
|
sp.gather_upload(fnRec=fn+'.npz')
|
|
if dt_misc['show_plots']:
|
|
dp=psi_device.shapepath.DebugPlot(sp)
|
|
dp.plot_gather(mode=11)
|
|
plt.show(block=False)
|
|
#plt.show(block=True)
|
|
_log.info('Shutter closed')
|
|
shutter.close()
|
|
# pv-monitor-func: start monitors
|
|
if not cfg.value(AppCfg.GBL_MISC)['live_on_collect']:
|
|
for pv in pv_mon_lst:
|
|
pv.auto_monitor=True
|
|
|
|
def esc_run_steps(self, steps, title, esc_state):
|
|
self._esc_state ="busy"
|
|
with pg.ProgressDialog(title, 0, len(steps)) as dlg:
|
|
for step in steps:
|
|
step()
|
|
dlg += 1
|
|
if dlg.wasCanceled():
|
|
QMessageBox.warning(self, "escape steps", "ABORTED" + title)
|
|
break
|
|
self._esc_state = esc_state
|
|
|
|
def move_gonio(self, pos):
|
|
# gonio="{'pos_mount': (0.0,0.0,0.0,0.0), 'pos_align': (0.0,0.0,0.0,0.0)}"
|
|
# positions are: fast_x,fast_y,base_x,base_z,(omega=0,det_z, no change)
|
|
|
|
app=QApplication.instance()
|
|
cfg=app._cfg
|
|
pos_gonio = cfg.value(AppCfg.DFT_POS_GONIO)
|
|
if pos_gonio is None:
|
|
msg="gonio default positions are not configured."
|
|
_log.warning(msg)
|
|
QMessageBox.warning(self, "gonio not configured", msg)
|
|
return
|
|
|
|
tw_fx = self.tweakers["fast_x"]
|
|
tw_fy = self.tweakers["fast_y"]
|
|
tw_cx = self.tweakers["base_x"]
|
|
tw_cy = self.tweakers["base_y"]
|
|
tw_cz = self.tweakers["base_z"]
|
|
try:
|
|
p_fx, p_fy, p_cx, p_cz,=pos_gonio['pos_'+pos]
|
|
except KeyError:
|
|
raise ValueError("Goniometer position *{}* is not known!!")
|
|
_log.info(f"moving goniometer {pos} to fx:{p_fx:.5g},fy:{p_fy:.5g},cx:{p_cx:.5g},cz_{p_cz:.5g}") #,ry:{p_ry:.5g}")
|
|
|
|
tw_fx.move_abs(p_fx)
|
|
tw_fy.move_abs(p_fy)
|
|
tw_cx.move_abs(p_cx)
|
|
tw_cz.move_abs(p_cz)
|
|
|
|
app_utils.assert_tweaker_positions([
|
|
(tw_fx, p_fx, 0.1),
|
|
(tw_fy, p_fy, 0.1),
|
|
(tw_cx, p_cx, 0.1),
|
|
(tw_cz, p_cz, 0.1), ], timeout=20.0,
|
|
)
|
|
|
|
def move_post_tube(self, pos):
|
|
# post_sample_tube="{'x_up': -0.2, 'y_up': 0.0, 'x_down': 0.0, 'y_down': 0.0, 'x_out_delta': 0.0, 'y_out_delta': 0.0, 'z_in': 0.0, 'z_out': 0.0}"
|
|
app=QApplication.instance()
|
|
cfg=app._cfg
|
|
|
|
pos_pst = cfg.value(AppCfg.DFT_POS_PST)
|
|
if pos_pst is None:
|
|
msg = "SwissMX *POST-SAMPLE-TUBE* configuration is incomplete!!!"
|
|
_log.warning(msg)
|
|
QMessageBox.warning(self, "post tube not configured", msg)
|
|
return
|
|
|
|
x_in_us=pos_pst['x_in_us']
|
|
y_in_us=pos_pst['y_in_us']
|
|
x_in_ds=pos_pst['x_in_ds']
|
|
y_in_ds=pos_pst['y_in_ds']
|
|
x_out_d=pos_pst['x_out_delta']
|
|
y_out_d=pos_pst['y_out_delta']
|
|
z_in =pos_pst['z_in']
|
|
z_out =pos_pst['z_out']
|
|
|
|
tw_x_us = self.tweakers["tube_usx"]
|
|
tw_y_us = self.tweakers["tube_usy"]
|
|
tw_x_ds = self.tweakers["tube_dsx"]
|
|
tw_y_ds = self.tweakers["tube_dsy"]
|
|
tw_z = self.tweakers["tube_z"]
|
|
|
|
tandem_twv = float(self._post_tandem_tweak_val.text())
|
|
|
|
_log.info(f"move post sample tube {pos}")
|
|
if pos == "in":
|
|
tw_x_us.move_abs(x_in_us)
|
|
tw_y_us.move_abs(y_in_us)
|
|
tw_x_ds.move_abs(x_in_ds)
|
|
tw_y_ds.move_abs(y_in_ds)
|
|
try:
|
|
app_utils.assert_tweaker_positions([
|
|
(tw_x_us, x_in_us, 0.1),
|
|
(tw_y_us, y_in_us, 0.1),
|
|
(tw_x_ds, x_in_ds, 0.1),
|
|
(tw_y_ds, y_in_ds, 0.1), ],timeout=20.0,
|
|
)
|
|
except app_utils.PositionsNotReached as e:
|
|
msg=f"failed to move post sample tube {pos}: {e}"
|
|
_log.warning(msg)
|
|
QMessageBox.warning(self, "failed to move post sample tube", msg,)
|
|
raise
|
|
tw_z.move_abs(z_in, wait=True)
|
|
try:
|
|
app_utils.assert_tweaker_positions([(tw_z, z_in, 0.1)])
|
|
except app_utils.PositionsNotReached as e:
|
|
msg=f"failed to move post sample tube {pos}: {e}"
|
|
_log.warning(msg)
|
|
QMessageBox.warning(self, "failed to move post sample tube", msg,)
|
|
raise
|
|
elif pos == "out":
|
|
tw_z.move_abs(z_out, wait=True)
|
|
try:
|
|
app_utils.assert_tweaker_positions([(tw_z, z_out, 0.1)])
|
|
except app_utils.PositionsNotReached as e:
|
|
msg=f"failed to move post sample tube {pos}: {e}"
|
|
_log.warning(msg)
|
|
QMessageBox.warning(self, "failed to move post sample tube", msg,)
|
|
raise
|
|
x_out_us=x_in_us+x_out_d; tw_x_us.move_abs(x_out_us)
|
|
y_out_us=y_in_us+y_out_d; tw_y_us.move_abs(y_out_us)
|
|
x_out_ds=x_in_ds+x_out_d; tw_x_ds.move_abs(x_out_ds)
|
|
y_out_ds=y_in_ds+y_out_d; tw_y_ds.move_abs(y_out_ds)
|
|
try:
|
|
app_utils.assert_tweaker_positions([
|
|
(tw_x_us, x_out_us, 0.1),
|
|
(tw_y_us, y_out_us, 0.1),
|
|
(tw_x_ds, x_out_ds, 0.1),
|
|
(tw_y_ds, y_out_ds, 0.1), ], timeout=20.0,
|
|
)
|
|
except app_utils.PositionsNotReached as e:
|
|
msg=f"failed to move post sample tube {pos}: {e}"
|
|
_log.warning(msg)
|
|
QMessageBox.warning(self, "failed to move post sample tube", msg,)
|
|
raise
|
|
elif pos == "x+":
|
|
tw_x_us.move_rel(tandem_twv)
|
|
tw_x_ds.move_rel(tandem_twv)
|
|
elif pos == "x-":
|
|
tw_x_us.move_rel(-tandem_twv)
|
|
tw_x_ds.move_rel(-tandem_twv)
|
|
elif pos == "y+":
|
|
tw_y_us.move_rel(tandem_twv)
|
|
tw_y_ds.move_rel(tandem_twv)
|
|
elif pos == "y-":
|
|
tw_y_us.move_rel(-tandem_twv)
|
|
tw_y_ds.move_rel(-tandem_twv)
|
|
else:
|
|
raise ValueError("post sample tube *{}* is not known!!")
|
|
|
|
def move_detector(self, pos):
|
|
# detector="{'pos_in': -0.1, 'pos_out': 0.0}"
|
|
app=QApplication.instance()
|
|
cfg=app._cfg
|
|
pos_det = cfg.value(AppCfg.DFT_POS_DET)
|
|
if pos_det is None:
|
|
msg="detector default positions are not configured."
|
|
_log.warning(msg)
|
|
QMessageBox.warning(self, "post tube not configured", msg)
|
|
return
|
|
|
|
det_z = self.tweakers["det_z"]
|
|
try:
|
|
p_z=pos_det['pos_'+pos]
|
|
except KeyError:
|
|
raise ValueError("Collimator position *{}* is not known!!")
|
|
_log.info(f"moving detector {pos} to {p_z:.5g}")
|
|
det_z.move_abs(p_z, assert_position=True)
|
|
|
|
def move_collimator(self, pos):
|
|
# collimator="{'x_in': 0.0, 'y_in': -0.1, 'x_out_delta': 0.0, 'y_out_delta': 0.0}"
|
|
app=QApplication.instance()
|
|
cfg=app._cfg
|
|
cx = self.tweakers["colli_x"]
|
|
cy = self.tweakers["colli_y"]
|
|
|
|
pos_col = cfg.value(AppCfg.DFT_POS_COL)
|
|
if pos_col is None:
|
|
msg="COLLIMATOR configuration is incomplete!"
|
|
_log.warning(msg)
|
|
QMessageBox.warning(self, "post tube not configured", msg)
|
|
return
|
|
|
|
x_in = pos_col['x_in']
|
|
y_in = pos_col['y_in']
|
|
x_out = pos_col['x_out']
|
|
y_out = pos_col['y_out']
|
|
|
|
if pos == "out":
|
|
x_pos=x_out
|
|
y_pos=y_out
|
|
elif pos == "in":
|
|
x_pos=x_in
|
|
y_pos=y_in
|
|
elif pos == "ready":
|
|
x_pos=x_in
|
|
y_pos=y_out
|
|
else:
|
|
raise ValueError("Collimator position *{}* is not known!!")
|
|
_log.info(f"moving collimator {pos} to X,Y = {x_pos:.3f}, {y_pos:.3f}")
|
|
cx.move_abs(x_pos, assert_position=True)
|
|
cy.move_abs(y_pos, assert_position=True)
|
|
|
|
|
|
|
|
|
|
if __name__=="__main__":
|
|
def main():
|
|
#_log.info(f'Version: pyqtgraph:{pg.__version__} matplotlib:{mpl.__version__} numpy:{np.__version__} epics:{epics.__version__} qt:{QT_VERSION_STR}' )
|
|
_log.info(f'Version: pyqtgraph:{pg.__version__} epics:{epics.__version__} qt:{QT_VERSION_STR}' )
|
|
import argparse, socket
|
|
hostname=socket.gethostname()
|
|
if hostname=='ganymede':
|
|
#use EPICS locally
|
|
#os.environ['EPICS_CA_ADDR_LIST']='localhost'
|
|
#use EPICS if connected to ESC network
|
|
os.environ['EPICS_CA_ADDR_LIST'] ='129.129.244.255 sf-saresc-cagw.psi.ch:5062 sf-saresc-cagw.psi.ch:5066'
|
|
|
|
#(h, t)=os.path.split(sys.argv[0]);cmd='\n '+(t if len(h)>20 else sys.argv[0])+' '
|
|
#exampleCmd=('', '-m0xf -v0')
|
|
epilog=__doc__ #+'\nExamples:'+''.join(map(lambda s:cmd+s, exampleCmd))+'\n'
|
|
parser=argparse.ArgumentParser(epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
parser.add_argument('-l', '--log', type=int, help='debug(1)|info(2)|warn(3)|error(4) default=%(default)d', default=2)
|
|
parser.add_argument("--sim", "-s", type=lambda x: int(x,0), help="simulate devices (see bitmasks) default=0x%(default)x", default=0)
|
|
args = parser.parse_args()
|
|
_log.info('Arguments:{}'.format(args.__dict__))
|
|
if args.log!=2:
|
|
for l in ('app_config', 'app_utils', 'backlight', 'camera', 'epics_widgets',
|
|
'geometry', 'illumination', 'ModuleFixTarget', 'MXMotion',
|
|
'pbtools', 'psi_device', 'pyqtUsrObj', 'shapepath', 'swissmx', 'zoom',):
|
|
try:
|
|
logging.getLogger(l).setLevel(args.log*10)
|
|
except BaseException as e:
|
|
_log.error(f'failed set log level ({l}): {e}')
|
|
if args.log<2: listLoggers()
|
|
|
|
from PyQt5.QtWidgets import QApplication
|
|
|
|
# set app icon
|
|
|
|
app = QApplication(sys.argv)
|
|
app_icon = QtGui.QIcon()
|
|
app_icon.addFile("artwork/logo/16x16.png", QtCore.QSize(16, 16))
|
|
app_icon.addFile("artwork/logo/24x24.png", QtCore.QSize(24, 24))
|
|
app_icon.addFile("artwork/logo/32x32.png", QtCore.QSize(32, 32))
|
|
app_icon.addFile("artwork/logo/48x48.png", QtCore.QSize(48, 48))
|
|
app_icon.addFile("artwork/logo/256x256.png", QtCore.QSize(256, 256))
|
|
app.setWindowIcon(app_icon)
|
|
|
|
startupWin=StartupSplash()
|
|
|
|
#for i in range(1, 20):
|
|
# startupWin.set(i,f'sample startup text {i}')
|
|
# #app.processEvents()
|
|
# t = time.time()
|
|
# #while time.time() < t + 0.1:
|
|
# # app.processEvents()
|
|
|
|
#app._settings=QSettings("PSI", "SwissMX")
|
|
|
|
app._args=args
|
|
|
|
app._pv_mon_lst=list() #pv-monitor-func: list of all pv that are monitored and that should
|
|
# be turned on/off during data collection to avoid (hopefully) segmentation fault
|
|
|
|
startupWin.set(5, f'load settings')
|
|
app._cfg=cfg=AppCfg()
|
|
|
|
app._geometry=geometry.geometry()
|
|
app._geometry._lut_pix2pos=cfg.value(cfg.GEO_PIX2POS)
|
|
|
|
app._geometry._opt_ctr=cfg.value(cfg.GEO_OPT_CTR)
|
|
|
|
startupWin.set(15, f'connect backlight')
|
|
pfx=app._cfg.value(AppCfg.GBL_DEV_PREFIX)[0]
|
|
dft_pos_bklgt=app._cfg.value(AppCfg.DFT_POS_BKLGT)
|
|
|
|
if args.sim&0x01:
|
|
app._backlight = backlight.Backlight(None,dft_pos_bklgt)
|
|
else:
|
|
app._backlight = backlight.Backlight(f'{pfx}:MOT_BLGT',dft_pos_bklgt)
|
|
startupWin.set(20, f'connect illumination')
|
|
if args.sim&0x02:
|
|
app._illumination = illumination.IlluminationControl(None)
|
|
else:
|
|
app._illumination = illumination.IlluminationControl()
|
|
startupWin.set(40, f'connect zoom')
|
|
if args.sim&0x04:
|
|
app._zoom = zoom.QopticZoom(None)
|
|
else:
|
|
app._zoom = zoom.QopticZoom()
|
|
startupWin.set(60, f'connect camera')
|
|
if args.sim&0x08:
|
|
app._camera = camera.epics_cam(None)
|
|
app._camera.sim_gen(mode=1)
|
|
else:
|
|
app._camera = camera.epics_cam()
|
|
|
|
if args.sim&0x80:
|
|
app._shutter = psi_device.Shutter(0)
|
|
else:
|
|
app._shutter = psi_device.Shutter(1)
|
|
|
|
#app._camera.run() is called in center_piece_update
|
|
|
|
startupWin.set(60, f'start main window')
|
|
|
|
app._mainWnd=wnd=WndSwissMx()
|
|
wnd.show()
|
|
startupWin._wnd.finish(wnd)
|
|
|
|
# needed so pycharm can restart application
|
|
signal.signal(signal.SIGINT, sigint_handler)
|
|
|
|
#main.update_user_and_storage() #ZAC: orig. code
|
|
sys.exit(app.exec_())
|
|
main()
|