#!/usr/bin/env python # *-----------------------------------------------------------------------* # | | # | Copyright (c) 2022 by Paul Scherrer Institute (http://www.psi.ch) | # | Based on Zac great first implementation | # | Author Thierry Zamofing (thierry.zamofing@psi.ch) | # *-----------------------------------------------------------------------* """ SwissMx experiment application. Based on Zac great first implementation bitmask for simulation: 0x001: backlight 0x002: illumination 0x004: zoom 0x008: camera 0x010: EPICS motors Deltatau 0x020: EPICS motors SmarAct 0x040: shutter 0x080: Jungfrau 0x100: Deltatau motion code """ import logging,time,os,sys,socket class col: d='\033[0m' # default r='\033[31m' # red g='\033[32m' # green y='\033[33m' # yellow rr='\033[91m' # red(bright) gg='\033[92m' # green(bright) yy='\033[93m' # yellow(bright) b='\033[1m' # bold u='\033[4m' # underline def listLoggers(): rootlogger=logging.getLogger() print(rootlogger) for h in rootlogger.handlers: print(' %s'%h) for nm, lgr in logging.Logger.manager.loggerDict.items(): print('+ [%-20s] %s '%(nm, lgr)) if not isinstance(lgr, logging.PlaceHolder): for h in lgr.handlers: print(' %s'%h) class logHandler(logging.StreamHandler): def __init__(self): logging.StreamHandler.__init__(self) def emit(self, record): '''override function of base class''' try: msg=self.format(record) # print(record.__dict__) if record.levelno<=10: c=col.g elif record.levelno<=20: c=col.y elif record.levelno<=30: c=col.yy elif record.levelno<=40: c=col.r else: c=col.rr+col.b msg=c+msg+col.d stream=self.stream stream.write(msg+self.terminator) self.flush() except RecursionError: raise except Exception: self.handleError(record) logging.basicConfig(level=logging.INFO, format='%(levelname)s:%(module)s:%(lineno)d:%(funcName)s:%(message)s', handlers=[logHandler()]) _log = logging.getLogger("swissmx") if __name__=="__main__": if socket.gethostname()=='ganymede': base=os.path.abspath(os.path.dirname(__file__)) sys.path.insert(0, os.path.abspath(os.path.join(base,'../PBSwissMX/python'))) sys.path.insert(0, os.path.abspath(os.path.join(base,'../../PBTools'))) else: base=os.path.abspath(os.path.dirname(__file__)) sys.path.insert(0, os.path.join(base,'PBTools')) sys.path.insert(0, os.path.join(base,'PBSwissMX/python')) class timestamp(): def __init__(self): self.t=time.time() def log(self,txt): t=time.time() print(txt+f'{t-self.t:6.3g}') self.t=t ts=timestamp() ts.log('Import part 1/8:') import sys, os import json, re import signal, subprocess import matplotlib as mpl import matplotlib.pyplot as plt mpl.use('Qt5Agg') # needed to avoid blocking of ui ! TASK_JUNGFRAU_SETTINGS = "jungfrau_settings" TASK_SETUP_GEOMETRY_CALIB = "geometry_calib" TASK_SETUP_PPM_CALIBRATION_TBOX = "ppm_calibration_tbox" TASK_SETUP_BEAM_CENTER = "beamcenter_calibration" TASK_SETUP_CAMERA = "setup_camera" TASK_SETUP_ROI = "setup_rois" TASK_SAMPLE_SELECTION = "task_sample_selection" TASK_SCREENING = "screening" TASK_FIX_TARGET = "fix_trg" TASK_GRID = "grid" TASK_PRELOCATED = "prelocated" TASK_HELICAL = "helical" TASK_EMBL = "embl" ts.log('Import part 2/8:') import ModuleFixTarget ts.log('Import part 3/8:') import qtawesome import qutilities from PyQt5 import QtCore, QtGui from PyQt5.QtCore import Qt, pyqtSlot, QSize, pyqtSignal, QObject, QRectF,QT_VERSION_STR from PyQt5.QtGui import QKeySequence, QPixmap from PyQt5.QtWidgets import ( QAction, QApplication, QFileDialog, QGridLayout, QHBoxLayout, QLabel, QLineEdit, QMessageBox, QProgressBar, QPushButton, QShortcut, QSizePolicy, QSplashScreen, QToolBox, QVBoxLayout, QWidget,) from PyQt5.uic import loadUiType ts.log('Import part 4/8:') import pyqtUsrObj as UsrGO from epics_widgets.MotorTweak import MotorTweak from epics_widgets.SimMotorTweak import SimMotorTweak ts.log('Import part 5/8:') import numpy as np np.set_printoptions(suppress=True,linewidth=196) import pyqtgraph as pg import pyqtgraph.exporters # use antialias for draw lines, interpret image data as row-major instead of col-major pg.setConfigOptions(antialias=True,imageAxisOrder='row-major') ts.log('Import part 6/8:') import app_utils from app_config import AppCfg,WndParameter #settings, option, toggle_option import epics import camera,backlight,zoom,illumination,geometry ts.log('Import part 7/8:') import psi_device ts.log('Import part 8/8:') def tdstamp(): return time.strftime("%Y%m%dH%H%M%S") def datestamp(): return time.strftime("%Y%m%d") def get_version(path='.'): #sys.stdout.write('getVersion() -> using git command -> ') p = subprocess.Popen(f'git -C {path} describe --match ''*.*.*'' --long --tags --dirty', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) retval = p.wait() res=p.stdout.readline() p.stdout.close() #res=res.decode()[1:-1].split('-',2) res=res.decode()[:-1].split('-',2) ver='.'.join(res[:2]) gitcmt=res[2][1:] return (ver,gitcmt) def sigint_handler(*args): """Handler for the SIGINT signal.""" app=QApplication.instance() app.quit() class AcquisitionAbortedException(Exception): pass class Sequencer(QObject): finished = pyqtSignal() timeoutExpired = pyqtSignal() errorHappened = pyqtSignal(str) def __init__(self, steps): super().__init__() self.steps = steps def run_sequence(self): num_steps = len(self.steps) for n, step in enumerate(self.steps): _log.info("runing step {}/{}".format(n, num_steps)) step() _log.info("emiting finished") self.finished.emit() class StartupSplash: def __init__(self): splash_pix = QPixmap("artwork/logo/256x256.png") self._wnd=splash = QSplashScreen(splash_pix, Qt.WindowStaysOnTopHint) splash.setWindowFlags(Qt.WindowStaysOnTopHint | Qt.FramelessWindowHint) splash.setEnabled(False) self._prgsBar=prgs=QProgressBar(splash) prgs.setMaximum(100) prgs.setGeometry(0, splash_pix.height() - 50, splash_pix.width(), 20) splash.show() def set(self,i,msg): self._prgsBar.setValue(i) self._wnd.showMessage(f"{msg}", int(Qt.AlignBottom|Qt.AlignCenter), Qt.black) app=QApplication.instance() app.processEvents() time.sleep(.1) Ui_MainWindow, QMainWindow = loadUiType(os.path.join(os.path.dirname(__file__),"swissmx.ui")) class WndSwissMx(QMainWindow, Ui_MainWindow): sigNewCamImg = pyqtSignal() # index in self._grids #pixelsPerMillimeter = pyqtSignal(float) #beamCameraCoordinatesChanged = pyqtSignal(float, float) addGridRequest = pyqtSignal(float, float) #zoomChanged = pyqtSignal(float) #folderChanged = pyqtSignal(str) prefixChanged = pyqtSignal(str) projectChanged = pyqtSignal(str) gridUpdated = pyqtSignal(int) # index in self._grids gonioMoveRequest = pyqtSignal(float, float, float, float, float) #fast_x_position = pyqtSignal(float) #fast_y_position = pyqtSignal(float) #fast_dx_position = pyqtSignal(float) #fast_dy_position = pyqtSignal(float) fiducialPositionSelected = pyqtSignal(float, float, float, float) # camx, camy, gonx, gony appendPrelocatedPosition = pyqtSignal(float, float, float, float) appendPrelocatedFiducial = pyqtSignal(bool, float, float, float, float) increaseRunNumberRequest = pyqtSignal() daqAborted = pyqtSignal() def __init__(self,): super(WndSwissMx, self).__init__() self.setupUi(self) app=QApplication.instance() if os.getenv("DEVELOPMENT_VERSION"): title = "[DEVELOPMENT] SwissMX - Solid support scanning for FEL sources" else: title = "SwissMX - Solid support scanning for FEL sources" self.setWindowTitle(title) self._do_quit = False path=os.path.join(os.path.dirname(__file__),"fonts") qtawesome.load_font("material", "MaterialIcons-Regular.ttf", "MaterialIcons-Regular.json", path) QtGui.QFontDatabase.addApplicationFont("fonts/Inconsolata-Bold.ttf") QtGui.QFontDatabase.addApplicationFont("fonts/Baloo-Regular.ttf") self.init_settings() # self.create_escape_toolbar() self.tweakers = {} self.setup_sliders() self.init_graphics() self._esc_state ="Maintenance" self._pin_mounting_offset = 0.0 self._mouse_tracking = False self._rois = [] # add a generic ellipse # pen = QtGui.QPen(QColor(Qt.yellow)) # pen.setWidth(5) # ellipse1 = QtWidgets.QGraphicsEllipseItem(650, 700, 500, 300) # ellipse1.setFlag(QGraphicsItem.ItemIsSelectable) # ellipse1.setFlag(QGraphicsItem.ItemIsMovable) # ellipse1.setPen(pen) # self.vb.addItem(ellipse1) # self.vb.addItem(Grid(pos=(650, 700), size=(100, 200), cell_size=(3, 4), parent=self)) # self.vb.addItem(Retangulo(pos=(750, 700), size=(100, 200), pen=pg.fn.mkPen(color=[230, 150, 200], width=4.7))) # self.vb.addItem(pg.PolyLineROI([[80, 60], [90, 30], [60, 40]], pen=(6,9), closed=True)) # test # x2 = np.linspace(-100, 100, 1000) # data2 = np.sin(x2) / x2 # # p8 = self.glw.addPlot(row=1, col=0, title="Region Selection") # plitem = pg.PlotItem() # plitem.plot(data2, pen=(255, 255, 255, 200)) # # lr = pg.LinearRegionItem([400, 700]) # # lr.setZValue(-10) # self.vb.addItem(plitem) self._message_critical_fault = QLabel(None) self._message_critical_fault.setAccessibleName("device_fault") self.statusbar.insertWidget(0, self._message_critical_fault) self._lb_coords=lbl=QLabel(None) #f=QFont('monospace', 10) #lbl.setFont(f) self.statusbar.addPermanentWidget(lbl) self._fel_status = QLabel(None) self._fel_status.setAccessibleName("fel_status_statusbar") self.statusbar.addPermanentWidget(self._fel_status) #swissfel.statusStringUpdated.connect(self._fel_status.setText) #ZAC: orig. code self._status_task = QLabel(None) self._status_task.setAccessibleName("status_task_label") self.statusbar.addPermanentWidget(self._status_task) #self.add_beam_marker() #self._beam_markers = {} # ppm calibration #self._zoom_to_ppm = {} #self._ppm_click = None self.load_stylesheet() #self.folderChanged.connect(lambda t: print("folder now: {}".format(t))) # FIXME self.init_actions() #self.prepare_microscope_page() self.prepare_wd_tabs_left() #self.update_beam_marker(qoptic_zoom.get_sp()) #ZAC: orig. code self._wd_stack.setCurrentIndex(0) self._wd_stack.currentChanged.connect(self.cb_update_center_widget) #self._OLD_init_validators() #self.init_settings_tracker() ? not needed, was for TELL ? #self._OLD_wire_storage() self.cb_update_center_widget(0) # start camera updater curzoom = app._zoom.get_val() _log.debug(f"starting app with zoom at {curzoom}") self.cb_zoom_changed(curzoom) self._tabs_daq_methods.currentChanged.connect(self.cb_switch_task) self.cb_switch_task() def init_settings(self): app = QApplication.instance() cfg = app._cfg keys = cfg.allKeys() #if EXPERIMENT_PGROUP not in keys: # self.update_user_and_storage() if cfg.WINDOW_GEOMETRY in keys: self.restoreGeometry(cfg.value(cfg.WINDOW_GEOMETRY, "")) self.restoreState(cfg.value(cfg.WINDOW_STATE, "")) if cfg.WINDOW_SPLITTER in keys: sz = [int(s) for s in cfg.value(cfg.WINDOW_SPLITTER)] else: sz=(500, 1200) self._wd_splitter.setSizes(sz) #if AppCfg.BEAM_MARKER_POSITIONS in keys: # self._beam_markers = s.value(AppCfg.BEAM_MARKER_POSITIONS) # self.update_beam_marker_fitters() # _log.info("read beam markers {}".format(self._beam_markers)) #if AppCfg.CAMERA_TRANSFORMATIONS in keys: # app._camera.set_transformations(s.value(AppCfg.CAMERA_TRANSFORMATIONS)) #if AppCfg.CAMERA_ZOOM_TO_PPM in keys: # self._zoom_to_ppm = s.value(AppCfg.CAMERA_ZOOM_TO_PPM) # _log.info(f"{AppCfg.CAMERA_ZOOM_TO_PPM} updated: {self._zoom_to_ppm}") # self.update_ppm_fitters() def init_graphics(self): app = QApplication.instance() cfg = app._cfg geo = app._geometry self.glw = pg.GraphicsLayoutWidget() self.microscope_page.setLayout(QVBoxLayout()) self.microscope_page.layout().addWidget(self.glw) self.glw.show() self.glw.scene().sigMouseMoved.connect(self.cb_mouse_move) self.glw.scene().sigMouseClicked.connect(self.cb_mouse_click) #--- viewbox --- self.vb=vb=self.glw.addViewBox(invertY=False,border='r',enableMenu=True) #TODO: vb.enableAutoRange(enable=True), vb.autoRange() does not work for ItemGroups #therefore set the vieweRange manually pad=10 vb.setRange(QRectF(-1200-pad,-1000-pad,1200+2*pad,1000+2*pad)) vb.setAspectLocked(True) vb.setBackgroundColor((120, 90, 90)) tr=QtGui.QTransform() # prepare ImageItem transformation: opt_ctr=app._geometry._opt_ctr #--- image group --- # uses image transformation # contains image and opticalcenter self._goImgGrp=grp=pg.ItemGroup() self.vb.addItem(grp) trf=cfg.value(AppCfg.GEO_CAM_TRF) # be aware: setTransform is transposed! # Qt uses: p'=(p.T*A.T).T , but I am used: p'=A*p, with p=[x,y,1].T tr.setMatrix(trf[0,0],trf[1,0],trf[2,0], #(-1, 0, 0, trf[0,1],trf[1,1],trf[2,1], # 0,-1, 0, trf[0,2],trf[1,2],trf[2,2]) # 0, 0, 1) grp.setTransform(tr) # assign transform #--- image --- self._goImg=img=pg.ImageItem() #border=pg.mkPen('r',width=2)) grp.addItem(img) #--- opctical center ---- oc_sz=np.array((50,50)) #self._goOptCtr=obj=UsrGO.Marker(-opt_ctr+oc_sz/2, oc_sz,mode=1) self._goOptCtr=obj=UsrGO.Marker(opt_ctr-oc_sz/2, oc_sz,mode=1, movable=False) obj.sigRegionChangeFinished.connect(self.cb_marker_moved) grp.addItem(obj) #--- grid --- try: self._goGrid=grid=pg.GridItem(pen=(0,255,0),textPen=(0,255,0)) #green grid and labels except NameError: _log.debug('workaround for typo in pyqtgraph:0.11.0') from PyQt5.QtGui import QPen,QColor self._goGrid=grid=pg.GridItem() # green grid and labels grid.opts['pen']=QPen(QColor(0, 255, 0)) grid.opts['textPen']=QPen(QColor(0, 255, 0)) #tr.reset() #grid.setTransform(tr) # assign transform vb.addItem(grid) #--- fixed group --- # uses pix2pos transformation with a fixed fx,fy value =(0,0) # contains beam marker self._goFixGrp=grp=pg.ItemGroup() geo.interp_zoom(1) pix2pos=geo._pix2pos A=np.asarray(pix2pos.I) tr=grp.transform() p1=np.hstack((opt_ctr,1)) #position of optical center on image item p2=np.matmul(trf, p1) #position of optical center on viewbox tr.setMatrix(A[0,0], A[0,1], 0, A[1,0], A[1,1], 0, p2[0], p2[1], 1) # translate dx,dy grp.setTransform(tr) self.vb.addItem(grp) #--- beam marker --- size_eu=cfg.value(AppCfg.GEO_BEAM_SZ)/1000 # convert from um to mm pos_eu=cfg.value(AppCfg.GEO_BEAM_POS)/1000 # convert from um to mm self._goBeamMarker=obj=UsrGO.Marker(pos_eu-size_eu/2,size_eu,mode=0,movable=False) obj.sigRegionChangeFinished.connect(self.cb_marker_moved) #bm.setTransform(tr) # assign transform grp.addItem(obj) #--- testing scan grid --- #self.track_objects() # first call is needed to initialize the structure self._goTracked # #go=UsrGO.Grid((120, -100), (200, 150), (30, 22), 2) # go=UsrGO.Grid((120, -100), (1000, 500), (10, 5), 2) # go.setTransform(tr) # assign transform # vb.addItem(go) # self._goTracked['objLst'].append(go) # self.track_objects() #tracking now the objects # #UsrGO.obj_tree(vb) def init_actions(self): app = QApplication.instance() cfg = app._cfg self.toolBar.setToolButtonStyle(Qt.ToolButtonTextUnderIcon) self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.SHIFT + Qt.Key_S), self) self.shortcut.activated.connect(self.cb_save_cam_image) self.shortcut = QShortcut(QKeySequence(Qt.Key_F2), self) self.shortcut.activated.connect(self.cb_save_cam_image) self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.Key_S), self) self.shortcut.activated.connect(self.cb_save_cam_image) for k in range(10): qkey = k + Qt.Key_0 self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.SHIFT + qkey), self) self.shortcut.activated.connect(lambda key=k: self.saveBookmark(key)) self.shortcut = QShortcut(QKeySequence(Qt.CTRL + qkey), self) self.shortcut.activated.connect(lambda key=k: self.gotoBookmark(key)) self._button_collect.clicked.connect(self.cb_execute_collection) # Toolbar buttons icon_size = QSize(50, 50) icon = qtawesome.icon("material.photo_camera") action = QAction(icon, "Save Original", self) action.setToolTip("(Ctrl+Shift+S) Take a screenshot of the sample image, without markers. Saves in current folder.") action.triggered.connect(self.cb_save_cam_image) self.toolBar.addAction(action) action = QAction(icon, "Save View", self) action.setToolTip("(Ctrl+S) Take a screenshot of the currently visible sample image, including markers. Saves in current folder.") action.triggered.connect(lambda: self.cb_save_cam_image(True)) self.toolBar.addAction(action) if os.getenv("DEVELOPMENT_VERSION"): icon = qtawesome.icon("material.style") action = QAction(icon, "Reload Stylesheet", self) action.triggered.connect(self.load_stylesheet) self.toolBar.addAction(action) # icon = qtawesome.icon("material.shutter_speed") #action = QAction("Use Pulse Picker", self) #action.setCheckable(True) #action.setChecked(cfg.option(AppCfg.ACTIVATE_PULSE_PICKER)) #action.triggered.connect(lambda: cfg.toggle_option(AppCfg.ACTIVATE_PULSE_PICKER)) #self.toolBar.addAction(action) #icon = qtawesome.icon("material.timeline") #action = QAction(icon, "Add Line", self) #action.triggered.connect(self._OLD_roi_add_line) #self.toolBar.addAction(action) #icon = qtawesome.icon("material.tab_unselected") #action = QAction(icon, "Add Rect", self) #action.triggered.connect(self._OLD_roi_add_rect) #self.toolBar.addAction(action) #icon = qtawesome.icon("material.play_for_work") #action = QAction(icon, "TELL Mount", self) #action.setToolTip("BookMark(0) => Move stages CZ, CX, FX, FY, and Omega to Tell sample changer mount position") #action.triggered.connect(self._OLD_escape_goToTellMountPosition) #self.toolBar.addAction(action) icon = qtawesome.icon("material.play_for_work") action = QAction(icon, "Auto\nFocus", self) action.triggered.connect(self.cb_autofocus) self.toolBar.addAction(action) action = QAction(icon, "Find\nFiducial", self) action.triggered.connect(self.cb_find_fiducial) self.toolBar.addAction(action) action = QAction(icon, "Test\nCode", self) action.triggered.connect(self.cb_testcode) self.toolBar.addAction(action) self.toolBar.addWidget(qutilities.horiz_spacer()) icon = qtawesome.icon("material.sync") action = QAction(icon, "Sample\nExchange", self) action.setToolTip("Move devices so a sample can be exchanged.") action.setObjectName("action_SampleExchange") action.triggered.connect(self.cb_esc_sample_exchange) self.toolBar.addAction(action) self.toolBar.widgetForAction(action).setAccessibleName("action_SampleExchange") icon = qtawesome.icon("material.my_location") action = QAction(icon, "Sample\nAlignment", self) action.setToolTip("Move devices so a sample can be aligned.") action.triggered.connect(self.cb_esc_sample_alignment) self.toolBar.addAction(action) self.toolBar.widgetForAction(action).setAccessibleName("action_SampleAlignment") icon = qtawesome.icon("material.fingerprint") action = QAction(icon, "Data\nCollection", self) action.setToolTip("Move devices so a sample can be collected.") action.triggered.connect(self.cb_esc_data_collection) self.toolBar.addAction(action) self.toolBar.widgetForAction(action).setAccessibleName("action_DataCollection") self.actionQuit.triggered.connect(self.cb_really_quit) self.actionPreferences.triggered.connect(self.cb_modify_app_param) self.actionHome_Fast_Stages.triggered.connect(self.cb_deltatau_home_faststages) self.actionAbout.triggered.connect(self.cb_about) #self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.SHIFT + Qt.Key_L), self) #self.shortcut.activated.connect(self._OLD_roi_add_line) #self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.SHIFT + Qt.Key_R), self) #self.shortcut.activated.connect(self._OLD_roi_add_rect) #self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.SHIFT + Qt.Key_M), self) #self.shortcut.activated.connect(self._OLD_toggle_mouse_tracking) #self.shortcut = QShortcut(QKeySequence(Qt.Key_F11), self) #self.shortcut.activated.connect(self._OLD_toggle_maximized) #self.shortcut = QShortcut(QKeySequence(Qt.Key_F12), self) #self.shortcut.activated.connect(self._OLD_show_window_configuration) #self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.Key_B), self) #self.shortcut.activated.connect(lambda: self._OLD_beammark.toggle_handle()) #self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.Key_P), self) #self.shortcut.activated.connect(self._OLD_camera_pause_toggle) # adding a menu entry to one of the menus #action = QAction("parameters", self) #action.setToolTip("modify application parameters") #action.setStatusTip("modify application parameters") #action.triggered.connect(self.cb_modify_app_param) #self.menuAdvanced.addAction(action) #action = QAction("geometry", self) #action.setToolTip("Update optical center, beam marker size etc.") #action.setStatusTip("Update optical center, beam marker size etc.") #action.triggered.connect(lambda x: cfg.dlg_geometry(self)) #self.menuAdvanced.addAction(action) #action = QAction("Backlight Reference Positions", self) #action.setToolTip("Update the reference positions for the backlight") #action.setStatusTip("Update the reference positions for the backlight") #action.triggered.connect(cfg.dlg_backlight_positions) #self.menuAdvanced.addAction(action) #action = QAction("Collimator Reference Positions", self) #action.setToolTip("Update the reference positions for the collimator") #action.setStatusTip("Update the reference positions for the collimator") #action.triggered.connect(cfg.dlg_collimator_reference_positions) #self.menuAdvanced.addAction(action) #action = QAction("Post sample tube Reference Positions", self) #action.setToolTip("Update the reference positions for the post tube") #action.setStatusTip("Update the reference positions for the post tube") #action.triggered.connect(cfg.dlg_posttube_references) #self.menuAdvanced.addAction(action) #action = QAction("Delta Tau Parameters", self) #action.setToolTip("Parameters affecting the Delta Tau") #action.setStatusTip("Parameters affecting the Delta Tau") #action.triggered.connect(cfg.dlg_deltatau_parameters) #self.menuAdvanced.addAction(action) #action = QAction("Cryojet Reference Positions", self) #action.setToolTip("Update the reference positions for the cryojet nozzle position") #action.setStatusTip("Update the reference positions for the cryojet nozzle position") #action.triggered.connect(self.set_cryojet_positions_dialog) #self.menuAdvanced.addAction(action) #action = QAction("Tell Mount Positions", self) #action.setToolTip("Mount positions for TELL sample changer") #action.setStatusTip("Parameters affecting the Delta Tau") #action.triggered.connect(cfg.dlg_tell_mount_positions) #self.menuAdvanced.addAction(action) def closeEvent(self, event): #overloaded function """this is called when the user clicks the window's cross icon""" if (self._do_quit or QMessageBox.question( self, "", "Are you sure you want to quit?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No,) == QMessageBox.Yes): app=QApplication.instance() cfg=app._cfg try: pv=app._camera._pv['pic'] pv.clear_auto_monitor() # disconnect PV monitor callback -> program exit faster. except AttributeError: _log.warning('disconnect PV callback failed.') _log.info('disconnect PV callback') cfg.setValue(cfg.WINDOW_GEOMETRY, self.saveGeometry()) cfg.setValue(cfg.WINDOW_STATE, self.saveState()) cfg.setValue(cfg.WINDOW_SPLITTER, self._wd_splitter.sizes()) cfg.setValue('last_active', time.time()) _log.info('save settings') #QMainWindow.closeEvent(self, event) _log.info('closeEvent done') else: event.ignore() def cb_update_center_widget(self, index): _log.debug('TODO: check to not connect smultiple igNewCamImg') if index > 0: # not showing camera image _log.warning("listening to zescape") pass #self.timer.stop() #zescape.start_listening() #self.timer = QtCore.QTimer(self) #self.timer.timeout.connect(self.check_zescape) #self.timer.start(20) else: app=QApplication.instance() try: self.cb_new_frame_sim() except AttributeError: self.sigNewCamImg.connect(self.cb_update_img) cam=app._camera cfg=app._cfg param=cam.get_param() paramSv=cfg.value(AppCfg.GEO_CAM_PARAM) if not paramSv: cfg.setValue(AppCfg.GEO_CAM_PARAM, param) else: for k,v in param.items(): if v!=paramSv[k]: if type(v)==tuple: if v==tuple(paramSv[k]): continue cam.set_param(**paramSv) #cfg.setValue(AppCfg.GEO_CAM_PARAM, param) #break _log.debug(f'diff:{k},{v} != {paramSv[k]}') cam.run(self.cb_new_frame_pv) app._pv_mon_lst.append(cam._pv['pic']) def cb_new_frame_pv(self, **kwargs): try: #pv-monitor-func #thrd=threading.current_thread() #_log.debug(f'thread:{thrd.getName()}, {thrd.native_id}') #_log.debug(f"{kwargs['timestamp']}") app=QApplication.instance() cam=app._camera sz=cam._sz if kwargs['count']==sz[0]*sz[1]: pic=kwargs['value'].reshape(sz[::-1]) else: sz=app._camera.update_size() pic=kwargs['value'].reshape(sz[::-1]) if pic.dtype==np.int16: pic.dtype=np.uint16 camera.epics_cam.set_fiducial(pic, 255) cam._pic=pic cam._timestamp=kwargs['timestamp'] try: cam.process() except AttributeError as e: pass # self._goImg.setImage(cam._pic) caused some deadlocks. # therefore try to update the image with signals instead self.sigNewCamImg.emit() except Exception as e: _log.critical(f'{e}') def cb_update_img(self): #thrd=threading.current_thread() #_log.debug(f'thread:{thrd.getName()}, {thrd.native_id}') app=QApplication.instance() cam=app._camera self._goImg.setImage(cam._pic) #vb.setRange(QRectF(-1300,-1100,1400,1200)) #force a segmentation fault #try: # dbgSegFault=self._dbgSegFault #except AttributeError as e: # dbgSegFault=self._dbgSegFault=0 #_log.warning(f'dbgSegFault:{dbgSegFault}') #if dbgSegFault>100: # _log.critical('Xforce a segmentation fault') # import ctypes # ctypes.string_at(0) #self._dbgSegFault+=1 def cb_new_frame_sim(self, **kwargs): app=QApplication.instance() sim=app._camera._sim imgSeq=sim['imgSeq'] idx=sim['imgIdx'] sim['imgIdx']=(idx+1) imgIdx=idx%imgSeq.shape[0] # _log.info('simulated idx:{}'.format(idx)) pic=imgSeq[imgIdx] self._goImg.setImage(pic) delay=500 # ms -> 2fps QtCore.QTimer.singleShot(delay, self.cb_new_frame_sim) #force a segmentation fault #_log.warning(f'imgIdx:{imgIdx},idx:{idx}') #if idx==20: # _log.critical('force a segmentation fault') # import ctypes # ctypes.string_at(0) def load_stylesheet(self): with open(os.path.join(os.path.dirname(__file__),"swissmx.css"), "r") as sheet: self.setStyleSheet(sheet.read()) def setup_sliders(self): cont = self._wd_right self._tweak_container = QWidget() self._tweak_container.setLayout(QVBoxLayout()) cont.layout().addWidget(self._tweak_container) layout = self._tweak_container.layout() layout.setSpacing(0) layout.setContentsMargins(0, 0, 0, 0) self.zoombox = zoom.Zoom() self.zoombox.init_settings() self.zoombox.zoomChanged.connect(self.cb_zoom_changed) self.zoombox.moveBacklight.connect(self.cb_move_backlight_safe) layout.addWidget(self.zoombox) toolbox = QToolBox() layout.addWidget(toolbox) self.build_group_faststage(toolbox) # self.build_slits_group(toolbox) self.build_group_collimator(toolbox) self.build_group_posttube(toolbox) self.build_group_xeye(toolbox) #self.build_cryo_group(toolbox) # monitor all axis for an axis fault for key, tweaker in self.tweakers.items(): tweaker.event_axis_fault.connect(self.cb_axis_fault) #self.tweakers["fast_x"].event_readback.connect(lambda alias, value, kw: self.fast_x_position.emit(value)) #self.tweakers["fast_y"].event_readback.connect(lambda alias, value, kw: self.fast_y_position.emit(value)) self.tweakers["fast_x"].event_rbv.connect(lambda rec_name, kw: self.track_objects()) self.tweakers["fast_y"].event_rbv.connect(lambda rec_name, kw: self.track_objects()) # layout.addStretch() def cb_marker_moved(self,obj,*args,**kwargs): _log.debug(args) _log.debug(kwargs) app=QApplication.instance() cfg=app._cfg geo=app._geometry opt_ctr=geo._opt_ctr if obj==self._goOptCtr: oc_pos=obj.pos() oc_sz=obj.size() geo._opt_ctr=opt_ctr=np.array(oc_pos+oc_sz/2) if (QMessageBox.question(self, "save config", f"save optical center position {opt_ctr} permanently?", QMessageBox.Yes|QMessageBox.No, QMessageBox.No, )==QMessageBox.Yes): cfg.setValue(AppCfg.GEO_OPT_CTR,opt_ctr) elif obj==self._goBeamMarker: bm_pos=obj.pos() bm_sz=obj.size() pos=np.array(bm_pos+bm_sz/2)*1000 # convert from mm to um if (QMessageBox.question(self, "save config", f"save beam marker position {pos} permanently?", QMessageBox.Yes|QMessageBox.No, QMessageBox.No, )==QMessageBox.Yes): cfg.setValue(AppCfg.GEO_BEAM_POS,pos) self.track_objects() def cb_zoom_changed(self, value): #self.zoomChanged.emit(value) app=QApplication.instance() geo=app._geometry try: geo.interp_zoom(value) except AttributeError as e: _log.warning(e) else: grp=self._goFixGrp opt_ctr=geo._opt_ctr A=np.asarray(geo._pix2pos.I) #trf=cfg.value(AppCfg.GEO_CAM_TRF) tr=self._goImgGrp.transform() p=tr.map(opt_ctr[0],opt_ctr[1]) tr.setMatrix(A[0, 0], A[0, 1], 0, A[1, 0], A[1, 1], 0, p[0], p[1], 1) # translate dx,dy grp.setTransform(tr) #bm=self._goBeamMarker #bm_sz=np.abs(geo.pos2pix(bm._size_eu)) #bm_pos=-opt_ctr-geo.pos2pix(bm._pos_eu)-bm_sz/2 #bm.blockSignals(True) # avoid to call cb_marker_moved #bm.setPos(bm_pos,finish=False) #bm.setSize(bm_sz) #bm.blockSignals(False) #_log.debug(f"zoom->{value} beam marker pos:{bm_pos} sz:{bm_sz})") self.track_objects() def cb_move_backlight_safe(self, pos): app=QApplication.instance() bl=app._backlight # any move of backlight requires post sample tube out self.move_post_tube("out") bl.move(pos) def get_tweaker(self, rec, mtype=0, **kwargs): app = QApplication.instance() sim=app._args.sim if mtype == 0: # default epics motor record if sim&0x10: m_tw=SimMotorTweak() else: m_tw=MotorTweak() elif mtype==1: # smaract motor record if sim&0x20: m_tw=SimMotorTweak() else: m_tw=MotorTweak() m_tw.connect_motor(rec, **kwargs) self.tweakers[kwargs['alias']] = m_tw return m_tw def build_post_tube_tandem_tweaker(self): block = QWidget() block.setAccessibleName("post tube tamden") block.setContentsMargins(0, 0, 0, 0) block.setLayout(QVBoxLayout()) c = QWidget() glay = QGridLayout() c.setLayout(glay) b = QWidget() blay = QHBoxLayout() b.setLayout(blay) blay.addWidget(QLabel("Tweak (mm)")) self._post_tandem_tweak_val = QLineEdit("1.0") blay.addWidget(self._post_tandem_tweak_val) glay.addWidget(b, 0, 0, 1, 2) icon = qtawesome.icon("material.arrow_back") but = QPushButton(icon, "") but.clicked.connect(lambda m, pos="x+": self.move_post_tube(pos)) glay.addWidget(but, 1, 0) # c.setLayout(glay) icon = qtawesome.icon("material.arrow_forward") but = QPushButton(icon, "") but.clicked.connect(lambda m, pos="x-": self.move_post_tube(pos)) glay.addWidget(but, 1, 1) # c.setLayout(glay) icon = qtawesome.icon("material.arrow_downward") but = QPushButton(icon, "") but.clicked.connect(lambda m, pos="y-": self.move_post_tube(pos)) glay.addWidget(but, 2, 0) # c.setLayout(glay) icon = qtawesome.icon("material.arrow_upward") but = QPushButton(icon, "") but.clicked.connect(lambda m, pos="y+": self.move_post_tube(pos)) glay.addWidget(but, 2, 1) block.layout().addWidget(c) return block def cb_axis_fault(self, pvname, kw): """ swissmx - {'pvname': 'SAR-EXPMX:MOT_FY.STAT', 'value': 0, 'char_value': 'NO_ALARM', 'status': 0, 'ftype': 17, 'chid': 38980392, 'host': 'SAR-CPPM-EXPMX1.psi.ch:5064', 'count': 1, 'access': 'read-only', 'write_access': False, 'read_access': True, 'severity': 0, 'timestamp': 1537867290.914189, 'precision': None, 'units': None, 'enum_strs': ('NO_ALARM', 'READ', 'WRITE', 'HIHI', 'HIGH', 'LOLO', 'LOW', 'STATE', 'COS', 'COMM', 'TIMEOUT', 'HWLIMIT', 'CALC', 'SCAN', 'LINK', 'SOFT'), 'upper_disp_limit': None, 'lower_disp_limit': None, 'upper_alarm_limit': None, 'lower_alarm_limit': None, 'lower_warning_limit': None, 'upper_warning_limit': None, 'upper_ctrl_limit': None, 'lower_ctrl_limit': None, 'nelm': 1, 'type': 'time_enum', 'typefull': 'time_enum', 'motor_field': 'STAT', 'source_field': 'STAT', 'cb_info': (1, ), 'alias': 'fast Y'} :param pvname: :param kw: :return: """ _log.debug(f"AXIS FAULT: {kw}") if kw["severity"]: msg = f"axis {pvname} has a fault" _log.critical(msg) _log.critical(f"{kw}") else: msg = "" self._message_critical_fault.setText(msg) @staticmethod def pv_monitor_add(widgets): #pv-monitor-func # this is a function to search for a issue that crashes the application # during data acquisition #_log.info('modify monitors') app=QApplication.instance() pv_mon_lst=app._pv_mon_lst try: for w in widgets[:-1]: #ignore last item for pv in w._motor._pvs.values(): pv_mon_lst.append(pv) # for k,pv in w._motor._pvs.items(): # pv.auto_monitor=False # for k,pv in w._motor._pvs.items(): # pv.auto_monitor=True except AttributeError as e: if type(w)==SimMotorTweak: _log.warning(f'{e},{w},{w._motor._rec_name}') else: _log.warning(f'{e},{w},{pv}') sim=app._args.sim assert sim&0x10,'assuming simulated motors' def build_group_faststage(self, toolbox): pfx=QApplication.instance()._cfg.value(AppCfg.GBL_DEV_PREFIX)[0] c=QWidget() c.setLayout(QGridLayout()) f=lambda v: lambda x:self.move_detector(v) #needs nested lambda to work ... for i,v in enumerate(('in','out')): but=QPushButton('detector '+v) but.clicked.connect(f(v)) c.layout().addWidget(but, 0, i) widgets=[ # self.get_tweaker('f"{pfx}:MOT_BLGT', alias='backlight', label='backlight'), self.get_tweaker(f"{pfx}:MOT_FY", alias="fast_y", label="fast Y"), self.get_tweaker(f"{pfx}:MOT_FX", alias="fast_x", label="fast X"), self.get_tweaker(f"{pfx}:MOT_CY", alias="base_y", label="base Y"), self.get_tweaker(f"{pfx}:MOT_CX", alias="base_x", label="base X"), self.get_tweaker(f"{pfx}:MOT_CZ", alias="base_z", label="base Z"), self.get_tweaker(f"{pfx}:MOT_DET_Z", alias="det_z", label="detector Z"), c ] qutilities.add_item_to_toolbox(toolbox,"Fast Stage",widget_list=widgets) WndSwissMx.pv_monitor_add(widgets) def build_group_collimator(self, toolbox): pfx=QApplication.instance()._cfg.value(AppCfg.GBL_DEV_PREFIX)[1] _log.warning('TODO: cleanup PV names!!!') pfx='SARES30-SMX' c=QWidget() c.setLayout(QGridLayout()) f=lambda v: lambda x:self.move_collimator(v) #needs nested lambda to work ... for i,v in enumerate(('in','out','ready')): but=QPushButton(v) but.clicked.connect(f(v)) c.layout().addWidget(but, 0, i) widgets = [ self.get_tweaker(f"{pfx}:MCS1", alias="colli_x", label="colli X", mtype=1,), self.get_tweaker(f"{pfx}:MCS2", alias="colli_y", label="colli Y", mtype=1,), c, ] qutilities.add_item_to_toolbox(toolbox,"Collimator",widget_list=widgets) WndSwissMx.pv_monitor_add(widgets) def build_group_posttube(self, toolbox): pfx=QApplication.instance()._cfg.value(AppCfg.GBL_DEV_PREFIX)[1] _log.warning('TODO: cleanup PV names!!!') pfx='SARES30-SMX' widgets = [ self.get_tweaker(f"{pfx}:MCS4", alias="tube_usx", label="upstream X", mtype=1,), self.get_tweaker(f"{pfx}:MCS6", alias="tube_usy", label="upstream Y", mtype=1,), self.get_tweaker(f"{pfx}:MCS5", alias="tube_dsx", label="downstream X", mtype=1,), self.get_tweaker(f"{pfx}:MCS7", alias="tube_dsy", label="downstream Y", mtype=1,), self.get_tweaker(f"{pfx}:MCS8", alias="tube_z", label="tube Z", mtype=1), ] c = QWidget() c.setLayout(QGridLayout()) but = QPushButton("post tube out") but.clicked.connect(lambda m, pos="out": self.move_post_tube(pos)) c.layout().addWidget(but, 0, 0) but = QPushButton("post tube in") but.clicked.connect(lambda m, pos="in": self.move_post_tube(pos)) c.layout().addWidget(but, 0, 1) widgets.append(c) label = "Post Sample Tube" block = QWidget() block.setAccessibleName(label) block.setContentsMargins(0, 0, 0, 0) block.setLayout(QVBoxLayout()) for w in widgets: block.layout().addWidget(w) block.layout().addWidget(self.build_post_tube_tandem_tweaker()) block.layout().addStretch() toolbox.addItem(block, label) def build_group_xeye(self, toolbox): pfx=QApplication.instance()._cfg.value(AppCfg.GBL_DEV_PREFIX)[1] _log.warning('TODO: cleanup PV names!!!') pfx='SARES30-SMX' widgets=[ self.get_tweaker(f"{pfx}:MCS14", alias="xeye_x", label="X", mtype=1), self.get_tweaker(f"{pfx}:MCS15", alias="xeye_y", label="Y", mtype=1), ] qutilities.add_item_to_toolbox(toolbox,"X-Ray Eye",widget_list=widgets) WndSwissMx.pv_monitor_add(widgets) def set_active_task(self, task): _log.info("TASK == {}".format(task)) self._active_task = task def active_task(self): return self._active_task def cb_mouse_move(self, pos): app = QApplication.instance() geo = app._geometry #pos = pixel position on the widget task = self.active_task() z = app._zoom.get_val() bm=self._goBeamMarker #pos=event.scenePos() pImg=pg.Point(self._goImg.mapFromScene(pos)) pTrk=pg.Point(self._goTracked.mapFromScene(pos)) pFix=pg.Point(self._goFixGrp.mapFromScene(pos)) pFix-=bm.pos()+bm.size()/2 fx=self.tweakers["fast_x"].get_val() fy=self.tweakers["fast_y"].get_val() pRel=pTrk-(fx,fy) #s=f'pImg{pImg} pTrk{pTrk} bm._pos_eu{bm._pos_eu}' try: pln=geo._fitPlane except AttributeError: cz=np.nan else: cz=pln[0]*pTrk[0]+pln[1]*pTrk[1]+pln[2] # z=ax+by+c s=\ f'img pix ({pImg[0]:0.1f} {pImg[1]:0.1f})px \u23A2 ' \ f'stage ({pTrk[0]:0.4f} {pTrk[1]:>0.4f} {cz:>0.4f})mm \u23A2 ' \ f'dist to beam ({pFix[0]:>0.4f} {pFix[1]:>0.4f})mm ' #f'dist to beam ({pRel[0]:>0.6g} {pRel[1]:>0.6g}mm) ' #_log.debug(s) self._lb_coords.setText(s) #self._lb_coords.setText( # "\u23FA{:>}:{:>6.0f} {:<.0f}" # "\u23FA{:>}:{:>6.0f} {:<.0f}" # "\u23FA{:>}:{:<.0f}" # "\u23FA{:>}:{:>6.1f} {:<.1f} \u00B5" # "\u23FA{:>}:{:>7.3f} {:<.3f} mm".format( # "Beam at", bx, by, # "Pixel coord ", x, y, # "PPM", ppm, # "Distance to beam", 1000 * dx, 1000 * dy, # "Stage", fx, fy, # ) #) def cb_mouse_click_geometry_calib(self, event): app=QApplication.instance() zoom = app._zoom.get_val() fast_x=self.tweakers["fast_x"];fast_y=self.tweakers["fast_y"] if self._btn_pix2pos.isChecked(): try: raw=app._raw_pix2pos[zoom] except KeyError as e: raw=app._raw_pix2pos[zoom]=list() imgPos=self._goImg.mapFromScene(event.scenePos()) fx=fast_x.get_rbv(); fy=fast_y.get_rbv() x=imgPos.x();y=imgPos.y() _log.debug('append calib: zoom:{} stage_x_y:{}/{} cam_pix_x_y:{}/{}'.format(zoom, fx, fy, x, y)) raw.append((fx, fy, x, y)) self._wd_calib_tree.setData(app._raw_pix2pos) elif self._btn_opt_ctr.isChecked(): try: raw=app._raw_opt_ctr[zoom] except KeyError as e: raw=app._raw_opt_ctr[zoom]=list() imgPos=self._goImg.mapFromScene(event.scenePos()) x=imgPos.x();y=imgPos.y() _log.debug('append calib: zoom:{} cam_pix_x_y:{}/{}'.format(zoom, x, y)) raw.append(( x, y)) self._wd_calib_tree.setData(app._raw_opt_ctr) else: QMessageBox.warning(self, "calibration", "no calibration started yet.\nPress 'pix2pos' or 'opt_ctr' button first") def cb_mouse_click(self, event): #_log.debug("{}".format(event)) #_log.debug("screen pos {}".format(event.screenPos())) #pixel position on the whole screen #_log.debug("scene pos {}".format(event.scenePos())) #pixel position on the scene (including black frame) #_log.debug(" pos {}".format(event.pos())) #pixel position of the ckicked object mapped to its coordinates #p=event.scenePos() #_log.debug(f"vb pos {self.vb.mapFromScene(p)}") #pixel position on the scene (including black frame) #for o in self.vb.childGroup.childItems(): # _log.debug(f"{type(o)} pos {o.mapFromScene(p)}") #pixel position on the scene (including black frame) #_log.debug(f"currentItem:{event.currentItem}") app=QApplication.instance() mod=event.modifiers() btn=event.button() task = self.active_task() if task==TASK_SETUP_GEOMETRY_CALIB: if btn==Qt.LeftButton and not mod&Qt.ControlModifier: self.cb_mouse_click_geometry_calib(event) return ################################ # Ctrl-left : move to position # Ctrl-right : object tree ################################ #dblclick = event.double() if btn==Qt.LeftButton: if mod&Qt.ShiftModifier: pass elif mod&Qt.ControlModifier: pos=event.scenePos() _log.debug(f'move to position :scene pos {pos}') geo=app._geometry bm=self._goBeamMarker pImg=pg.Point(self._goImg.mapFromScene(pos)) pTrk=pg.Point(self._goTracked.mapFromScene(pos)) pFix=pg.Point(self._goFixGrp.mapFromScene(pos)) pFix-=bm.pos()+bm.size()/2 #create trace self.add_camera_trace(pFix) _log.debug(f'dist to beam ({pFix[0]:>0.6g} {pFix[1]:>0.6g}mm)') fx_motor=self.tweakers["fast_x"] fy_motor=self.tweakers["fast_y"] fx_motor.move_rel(pFix[0]) fy_motor.move_rel(pFix[1]) #fx_motor.move_abs(fx_motor.get_val()+pFix[0]) #fy_motor.move_abs(fy_motor.get_val()+pFix[1]) #if none of the image is visible, recenter it r1=self.vb.viewRect() r2=self.vb.itemBoundingRect(self._goImg) if not r1.intersects(r2): r1.translate(r2.center()-r1.center()) self.vb.setRange(r1) try: pln=geo._fitPlane except AttributeError: pass else: cz_motor=self.tweakers["base_z"] cz=pln[0]*pTrk[0]+pln[1]*pTrk[1]+pln[2] #z=ax+by+c if cz: cz_motor.move_abs(cz) else: _log.critical(f'{cz} {pTrk} {pln}') _log.debug(f'cz={pln[0]:.4g}*{pTrk[0]:.4g}+{pln[1]:.4g}*{pTrk[1]:.4g}+{pln[2]:.4g}={cz}') pass elif mod&Qt.AltModifier: pass else: pass elif btn==Qt.RightButton: if mod&Qt.ShiftModifier: pass elif mod&Qt.ControlModifier: _log.debug('object tree') UsrGO.obj_tree(self.vb) pass elif mod&Qt.AltModifier: pass else: pass return return assert(event.currentItem==self.vb) #UsrGO.obj_info(self.img) event.pos()# return Point(self.currentItem.mapFromScene(self._scenePos)) app=QApplication.instance() if event.button() == Qt.RightButton: UsrGO.obj_info(self.vb.childTransform()) event.ignore() return pos = event.scenePos() zoom_level = app._zoom.get_val() _log.debug(" zoom {}".format(zoom_level)) try: ppm = self.ppm_fitter(zoom_level) except: ppm = 1E3 dblclick = event.double() shft = Qt.ShiftModifier & event.modifiers() ctrl = Qt.ControlModifier & event.modifiers() alt = Qt.AltModifier & event.modifiers() xy = self._goImg.mapFromScene(pos) x, y = xy.x(), xy.y() bx, by = self.get_beam_mark_on_camera_xy() a = np.asarray([x, y]) b = np.asarray([bx, by]) pixel_dist = np.linalg.norm(a - b) _log.debug("distance to beam: {:.1f} pixels".format(pixel_dist)) omega = omega_motor.get_position() _log.debug("omega at {:.03f} degrees".format(omega)) fx = fx_motor.get_position() fy = fy_motor.get_position() dx = (x - bx) / ppm dy = -(y - by) / ppm fx += dx fy += dy _log.debug("click at : {:.3f} {:.3f}".format(x, y)) _log.debug("beam at : {:.3f} {:.3f}".format(bx, by)) _log.debug("gonio at : {:.3f} {:.3f}".format(fx, fy)) # publisher.submit( # {"event": "gonio", "data": self.get_gonio_positions(as_json=True)} # ) _log.debug("TASK = {}".format(task)) # Click without modifers moves clicked position to beam mark if task not in (TASK_SETUP_PPM_CALIBRATION, TASK_SETUP_BEAM_CENTER): if not (ctrl or shft or alt): if ppm == 0: _log.warning("PPM is not defined - do the PPM calibration") if task == TASK_HELICAL: dx = (x - bx) / ppm dy = -(y - by) / ppm _log.info("moving base_X, fast_Y {:.3f}, {:.3f}".format(dx, dy)) bx_motor.move_rel(dx) fy_motor.move_rel(dy) else: dx = np.cos(omega * np.pi / 180) * (x - bx) / ppm dy = -(y - by) / ppm _log.info("moving fast x/y to {:.3f}, {:.3f}".format(dx, dy)) fx_motor.move_rel(dx) fy_motor.move_rel(dy) self.fast_dx_position.emit(dx) self.fast_dy_position.emit(dy) return # CTRL-Click always just moves Z if ctrl and not (shft or alt): mm = np.sign(y - by) * pixel_dist / 10000. _log.debug("moving Z stage by {:.3f} mm".format(mm)) bz_motor.move_rel(mm) return if task in (TASK_SETUP_PPM_CALIBRATION, TASK_SETUP_BEAM_CENTER): if ctrl and self._ppm_calibration.isChecked(): if self._ppm_click is not None: feature_dist = self._ppm_feature_size_spinbox.value() / 1000. b = self._ppm_click pixel_dist = np.linalg.norm(a - b) ppm = pixel_dist / feature_dist _log.debug("calib save a {}".format(a)) _log.debug("calib save b {}".format(b)) _log.debug(" feature {}".format(feature_dist)) _log.debug("a -> b pixels {}".format(pixel_dist)) _log.debug(" ppm({}) {}".format(zoom_level, pixel_dist)) self._zoom_to_ppm[zoom_level] = ppm settings.setValue(CAMERA_ZOOM_TO_PPM, self._zoom_to_ppm) self._ppm_click = None _log.debug("ppm data so far {}".format(self._zoom_to_ppm)) else: _log.debug("calib save b {}".format(a)) self._ppm_click = a return elif task == TASK_SETUP_BEAM_CENTER: if ctrl and not (shft or alt): _log.info("beam mark add: zoom {} => {:.1f} {:.1f}".format(zoom_level, x, y)) self.append_to_beam_markers(x, y, zoom_level) else: _log.warning("click combination not available") elif task == TASK_GRID: if shft and not (ctrl or alt): self.addGridRequest.emit(fx, fy) elif task == TASK_PRELOCATED: if ctrl and shft: omegacos = option(DELTATAU_OMEGACOS) if omegacos: fx = fx / np.cos(omega * np.pi / 180) _log.info( "selectin fiducial (omegacos): {:.3f}, {:.3f}".format(fx, fy) ) else: _log.info("selectin fiducial: {:.3f}, {:.3f}".format(fx, fy)) self.fiducialPositionSelected.emit(x, y, fx, fy) elif shft and alt: self.appendPrelocatedPosition.emit(x, y, fx, fy) elif task == TASK_HELICAL: _log.info("no action associated ") elif task == TASK_EMBL: pass else: pass def track_objects(self): #self._goTracked= dictionary with infos about tracked objects # objLst = list of objects to track # state = (zoom, fx, fy) (created at first call of self.track_objects() ) app=QApplication.instance() geo=app._geometry zoom = app._zoom.get_val() twk_fx=self.tweakers["fast_x"];twk_fy=self.tweakers["fast_y"] # TODO: get_position() is delayed as it is the RBV. do same as for the zoom # TODO: and return the last set_point fx=twk_fx.get_rbv() fy=twk_fy.get_rbv() try: grp=self._goTracked grpTrc=self._goTrace zoom_old,fx_old,fy_old=grp._state except AttributeError: # at initialization self._goTracked=grp=pg.ItemGroup() grp.setZValue(20) #higher than beam marker and opt ctr that have z=10 self.vb.addItem(grp) grp._state=(zoom,fx,fy) zoom_old=fx_old=fy_old=None #--- camera tracing images --- self._goTrace=grpTrc=pg.ItemGroup() # the transformation is the same as for tracked objects grpTrc.setZValue(-20) #lower than other objects self.vb.addItem(grpTrc) try: _log.debug(f"zoom:{zoom_old}->{zoom:d} fx:{fx_old:.3f}->{fx:.3f} fy:{fy_old:.3f}->{fy:.3f}") except TypeError: _log.debug(f"zoom:{zoom_old}->{zoom} fx:{fx_old}->{fx:.3f} fy:{fy_old}->{fy:.3f}") if zoom_old!=zoom: geo.interp_zoom(zoom) bm=self._goBeamMarker opt_ctr=geo._opt_ctr A=np.asarray(geo._pix2pos.I) # trf=cfg.value(AppCfg.GEO_CAM_TRF) tr=self._goFixGrp.transform() p1=bm.pos()+bm.size()/2+(-fx,-fy) #p=tr.map(opt_ctr[0], opt_ctr[1]) p=tr.map(p1[0], p1[1]) tr.setMatrix(A[0, 0], A[0, 1], 0, A[1, 0], A[1, 1], 0, p[0], p[1], 1) # translate dx,dy grp.setTransform(tr) grpTrc.setTransform(tr) def add_camera_trace(self,delta_mm): # add up to tracelen image when moveing motors to new positions try: tracelen=self._goTrace._tracelen except AttributeError: app=QApplication.instance() cfg=app._cfg tracelen=self._goTrace._tracelen=cfg.value(AppCfg.GBL_MISC)['img_trace_len'] if tracelen==0: return trImg=self._goImgGrp.transform() trTrk=self._goTracked.transform() tr=trImg*trTrk.inverted()[0] # TODO: check if oder correct !!! cm=pg.ColorMap((0,255),((0,0,64),(255,255,128)))#,opacity=.5) img=pg.ImageItem(self._goImg.image,lut=cm.getLookupTable(0,255)) img.setTransform(tr) self._goTrace.addItem(img) # automatically added to self.vb cldLst=self._goTrace.childItems() if len(cldLst)>tracelen: #check if one image needs to be removed r1=self.vb.itemBoundingRect(img) for c in cldLst[:-1]: #check overlap with newest image, and remove previous if it is more than 30% r2=self.vb.itemBoundingRect(c) r3=r1.intersected(r2) s3=r3.size() s1=r1.size() if s3.width()/s1.width()>0.3 and s3.height()/s1.height()>0.3: self.vb.removeItem(c) return self.vb.removeItem(cldLst[0]) #if no one was removes, remove oldest def cb_modify_app_param(self): wnd=WndParameter(self) wnd.show() @pyqtSlot() def cb_save_cam_image(self,overlays=False): app=QApplication.instance() cam=app._camera filename, _ = QFileDialog.getSaveFileName(self,"Save data file",'', 'PNG files (*.png);;all files (*)') if not filename: return _log.info(f"saving view screenshot: {filename}") #try: if not overlays: import PIL.Image #img=PIL.Image.fromarray(cam.pic.astype(np.uint8)) try: pic=cam._pic mx=pic.max() if pic.max()>255: scl=2**int(round(np.log2(mx)-8)) pic=np.array(pic/scl,dtype=np.uint8) elif pic.dtype!=np.uint8: pic=np.array(pic, dtype=np.uint8) except AttributeError: sim=app._camera._sim pic=cam._sim['imgSeq'][sim['imgIdx']] img=PIL.Image.fromarray(pic) img.save(filename) else: exporter = pg.exporters.ImageExporter(self.vb) # set export parameters if needed #exporter.parameters()["width"] = 2000 # (note this also affects height parameter) # save to file exporter.export(filename) #except Exception as e: # _log.warning(e) # QMessageBox.warning(self, "Screenshot: failed to save image", "Failed to save screenshot!") def cb_execute_collection(self): app=QApplication.instance() geo=app._geometry #zoom=app._zoom.get_val() task = self.active_task() #self._is_aborted = False #method = self._tabs_daq_methods.currentWidget().accessibleName() if task == TASK_FIX_TARGET: #self.re_connect_collect_button(callback=self.collect_abort_grid, accessibleName="grid_abort", label="Abort Grid Scan",) #self._inspect = self._grid_inspect_area #self._inspect.setPlainText("") fast_x=self.tweakers["fast_x"]; fast_y=self.tweakers["fast_y"] fx=fast_x.get_val() fy=fast_y.get_val() opt_ctr=geo._opt_ctr grp=self._goTracked cldLst=grp.childItems() for go in cldLst: if type(go)==UsrGO.Fiducial: continue t=type(go) if t not in(UsrGO.FixTargetFrame,UsrGO.Path): _log.warning(f'{t} not supported for FixTargetFrame ->skipped:{go}') continue try: param=go.get_scan_param() #points in coordinate system of ROI except AttributeError as e: _log.warning(f'no scan parameters for object->skipped:{go}') continue trf=np.asmatrix(param['trf']) p=param['points'] p=(np.hstack((p,np.ones((p.shape[0],1))))*trf).A param['pts_trf']=p # transformed points in um motor coordinates param['num_pts']=p.shape[0] vb=self.vb grp=self._goTracked mft=self._moduleFixTarget try: tmpGoLst=self._goTmp except AttributeError: tmpGoLst=self._goTmp=[] else: for go in tmpGoLst: vb.removeItem(go) tmpGoLst.clear() # adding debug fiducials #n=int(p.shape[0]/100)+1 #for i in range(0,p.shape[0],n): # fx,fy=p[i, :]/1000 # fx=-fx #X axis has inverted sign ! # l=.06 # go=UsrGO.Fiducial((fx-l/2, fy-l/2), (l, l), i) # grp.addItem(go) # tmpGoLst.append(go) #mft._tree.setData(grp.childItems()) #_log.debug(f'first point:{p[0,:]}') #_log.debug(f'step to 2nd point:{p[1,:]-p[0,:]}') #print('DEBUG: difference from point to point:') #print(np.diff(p, axis=0)) self.daq_collect(**param) elif task == TASK_GRID: self.re_connect_collect_button( callback=self.collect_abort_grid, accessibleName="grid_abort", label="Abort Grid Scan", ) self._inspect = self._grid_inspect_area self._inspect.setPlainText("") geo=app._geometry zoom=app._zoom.get_val() fast_x=self.tweakers["fast_x"]; fast_y=self.tweakers["fast_y"] fx=fast_x.get_val() fy=fast_y.get_val() opt_ctr=geo._opt_ctr grp=self._goTracked cldLst=grp.childItems() for go in cldLst: points=go.get_points() #points in coordinate system of ROI # names consists of abrevations # part 0: po=position sz=size dt=delta # part 1: px=pixel eu=engineering units (e.g. mm) po_px=go.pos() sz_px=go.size() tr=go.transform() # TODO: this is not yet used UsrGO.obj_info(tr) dt_px=-opt_ctr-po_px dt_eu=geo.pix2pos(dt_px)+(fx,fy) for i in range(points.shape[0]): points[i,:]=dt_eu+geo.pix2pos(points[i,:])#*tr method="grid" params=None #grid._grid_dimensions # params = ([grid._grid_dimensions[0]], [grid._grid_dimensions[1]]) # FIXME something wrong here< self.daq_collect_points(points, visualizer_method=method, visualizer_params=params) #self.daq_grid_collect_grid() elif task == TASK_PRELOCATED: self._inspect = self._preloc_inspect_area self._inspect.setPlainText("") self.daq_prelocated_collect_points() elif task == TASK_HELICAL: self.daq_helical_collect() elif task == TASK_EMBL: self.daq_embl_collect_points() def cb_esc_sample_exchange(self): app=QApplication.instance() steps = [] #if option(CRYOJET_MOTION_ENABLED): # steps.append(lambda: self.move_cryojet_nozzle("out")) steps=[ lambda:self.move_detector("out"), lambda: self.move_post_tube("out"), lambda: app._backlight.move("out"), lambda: self.move_collimator("out"), lambda:self.move_gonio("mount"), ] self.esc_run_steps(steps, "Transitioning to Sample Exchange","ManualSampleExchange") def cb_esc_sample_alignment(self): app=QApplication.instance() steps = [ # lambda: sample_selection.tell.set_current(30.0), lambda:self.move_detector("out"), lambda: self.move_collimator("out"), lambda:self.move_detector("out"), lambda:self.move_post_tube("out"), lambda:app._backlight.move("in"), lambda:self.move_collimator("out"), lambda:self.move_gonio("align"), ] #if option(CRYOJET_MOTION_ENABLED): # steps.extend([lambda: self.move_cryojet_nozzle("in")]) #steps.extend([lambda: self.move_post_tube("out"), lambda: app._backlight.move("in")]) self.esc_run_steps(steps, "Transitioning to Sample Alignment","SampleAlignment") def cb_esc_data_collection(self): app=QApplication.instance() steps = [ # lambda: sample_selection.tell.set_current(30.0), lambda: app._backlight.move("out"), lambda: self.move_post_tube("in"), lambda: self.move_collimator("in"), lambda:self.move_detector("in"), ] #if option(CRYOJET_MOTION_ENABLED): # steps.extend([lambda: self.move_cryojet_nozzle("in")]) self.esc_run_steps(steps, "Transitioning to Data Collection","DataCollection") def cb_really_quit(self): """called when user Ctrl-Q the app""" if QMessageBox.question(self, "", "Are you sure you want to quit?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No,) == QMessageBox.Yes: self._do_quit = True self.close() def cb_deltatau_home_faststages(self): pfx=QApplication.instance()._cfg.value(AppCfg.GBL_DEV_PREFIX)[0] _log.info("homing fast stages") epics.PV(f"{pfx}1:ASYN.AOUT").put(b"enable plc 1") def cb_about(self): try: ver,gitcmt=get_version() v_SMX=f'{ver} git:{gitcmt}' except: v_SMX='git version failed' try: ver,gitcmt=get_version('..') v_SMXT=f'{ver} git:{gitcmt}' except: v_SMXT='git version failed' txt=f'''About Swissmx: SwissMX: {v_SMX} SwissMXTools: {v_SMXT} qt:{QT_VERSION_STR} pyqtgraph:{pg.__version__} numpy:{np.__version__} matplotlib:{mpl.__version__} epics:{epics.__version__} Copyright (c) 2022 by Paul Scherrer Institute (http://www.psi.ch) Based on Zac great first implementation Author Thierry Zamofing (thierry.zamofing@psi.ch) ''' QMessageBox.about(self, "SwissMX", txt) pass def cb_autofocus(self): app=QApplication.instance() cfg=app._cfg.value(AppCfg.GEO_AUTOFOC) geo=app._geometry #geo.autofocus(app._camera, self.tweakers['base_z'],rng=(-1, 1), n=30,saveImg=True) af_range=cfg['range'] n=cfg['steps'] vel=cfg['velocity'] rng=(-af_range/2, af_range/2) #geo.autofocus(app._camera, self.tweakers['base_z'],rng, n,dlg) af=geometry.autofocus(app._camera, self.tweakers['base_z']) # af.run(rng, n, dlg) #with pg.ProgressDialog('Progress', 0, n) as dlg: af.run_continous(rng=rng, velo=vel) def cb_find_fiducial(self): app=QApplication.instance() geo=app._geometry cfg=app._cfg.value(AppCfg.GEO_FND_FID) scl=np.linalg.norm(geo.pos2pix((.001,0))) #pix/um sz=int(cfg['sz']*scl) brd=int(cfg['brd']*scl) # sz=( 84, 84); brd=( 7, 7) # zoom 001 # sz=(128, 128); brd=( 9, 9) # zoom 200 # sz=(200, 200); brd=(13, 13) # zoom 400 _log.debug(f'sz:{sz} brd:{brd}') pos,corr=geo.find_fiducial(app._camera, sz=(sz,sz),brd=(brd,brd)) bz=self.tweakers["base_z"].get_rbv() pos_scn=self._goImg.mapToScene(pg.Point(pos)) pos_eu=self._goTracked.mapFromScene(pos_scn) l=.120 #go=UsrGO.Fiducial((fx-l/2, fy-l/2), (l, l), bz) go=UsrGO.Fiducial((pos_eu.x()-l/2, pos_eu.y()-l/2), (l, l), bz) go.sigRegionChangeFinished.connect(self.cb_fiducial_update_z) grp=self._goTracked grp.addItem(go) self._moduleFixTarget._tree.setData(grp.childItems()) pass def cb_testcode(self): app=QApplication.instance() cfg=app._cfg geo=app._geometry try: tc=self._testCode tc['idx']+=1 except AttributeError: self._testCode=tc={'idx':0} step=tc['idx'] def testMatplotlib(): import matplotlib.pyplot as plt x=np.linspace(0.1, 2*np.pi, 41) y=np.exp(np.sin(x)) plt.stem(x, y) plt.show(block=False) step=6 if step==0: vb=self.vb vb.autoRange(items=(self._goImg,)) elif step==1: testMatplotlib() elif step==2: vb=self.vb grp=pg.ItemGroup() vb.addItem(grp) obj=UsrGO.Marker((100, 100), (100, 100), mode=1) grp.addItem(obj) obj=UsrGO.Marker((150, 100), (50, 50), mode=1) grp.addItem(obj) obj=UsrGO.Marker((200, 100), (100, 100), mode=1) grp.addItem(obj) tc['grp']=grp vb.autoRange(items=(obj,)) elif step==3: grp=tc['grp'] tr=grp.transform() # UsrGO.obj_info(tr) tr.setMatrix(1, .2, 0, -.2, 1, 0, 0, 0, 1) grp.setTransform(tr) elif step==4: geo.autofocus(app._camera, self.tweakers['base_z'],rng=(-1, 1), n=30,saveImg=True) elif step==5: geo.autofocus(app._camera, self.tweakers['base_z'],rng=(-1, 1), n=10) elif step==6: geo.find_fiducial(app._camera) #print(vb.childGroup.childItems()) pass def prepare_wd_tabs_left(self): tabs = self._wd_tabs_left tabs.currentChanged.connect(self.cb_switch_task) setup_tab = self._tab_setup exp_tab = self._tab_experiment # add layouts #exp_tab.setLayout(QVBoxLayout()) setup_tab.setLayout(QVBoxLayout()) setup_tab.setSizePolicy(QSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)) self._setup_toolbox = tbox = QToolBox() tbox.setSizePolicy(QSizePolicy(QSizePolicy.Expanding, QSizePolicy.Ignored)) # tbox.setMinimumWidth(itemWidget.sizeHint().width()) setup_tab.layout().addWidget(tbox) # -------- DAQ Methods Tabs -------- self.build_tab_module_fix_target() #self._OLD_build_daq_methods_grid_tab() #self._OLD_build_daq_methods_prelocated_tab() #ZAC: orig. code #self._OLD_create_helical_widgets() #ZAC: orig. code #self._OLD_build_daq_methods_embl_tab() #self._OLD_build_sample_selection_tab() # -------- group geometry calibration -------- block = QWidget() block.setAccessibleName(TASK_SETUP_GEOMETRY_CALIB) block.setContentsMargins(0, 0, 0, 0) block.setLayout(QVBoxLayout()) grp = QWidget() block.layout().addWidget(grp) block.layout().addStretch() grp.setLayout(QGridLayout()) tbox.addItem(block, "geometry calibration") #grp.layout().addWidget(QLabel("feature size (µM)"), 0, 0) #self._ppm_feature_size_spinbox = QDoubleSpinBox() #self._ppm_feature_size_spinbox.setRange(5, 10000) #self._ppm_feature_size_spinbox.setValue(50) #self._ppm_feature_size_spinbox.setDecimals(0) #self._ppm_feature_size_spinbox.setSuffix(" µM") #grp.layout().addWidget(self._ppm_feature_size_spinbox, 0, 1) self._btn_pix2pos = but = QPushButton("Pixel to position calibration") but.setCheckable(True) but.clicked.connect(self.cb_update_pix2pos) grp.layout().addWidget(but, 0, 0) self._btn_opt_ctr = but = QPushButton("Optical center calibration") but.setCheckable(True) but.clicked.connect(self.cb_update_opt_ctr) grp.layout().addWidget(but, 1, 0) self._wd_calib_tree=tree=pg.DataTreeWidget(data=None) grp.layout().addWidget(tree, 2, 0) tbox.currentChanged.connect(self.cb_switch_task) # final stretch # setup_tab.layout().addStretch() exp_tab.layout().addStretch() # Jungfrau parameters block=QWidget() block.setAccessibleName(TASK_JUNGFRAU_SETTINGS) block.setContentsMargins(0, 0, 0, 0) block.setLayout(QVBoxLayout()) # self.jungfrau = jungfrau_widget.JungfrauWidget() #ZAC: orig. code # block.layout().addWidget(self.jungfrau) #ZAC: orig. code block.layout().addStretch() tbox.addItem(block, "Jungfrau Parameters") # # -------- group beam box -------- # block = QWidget() # block.setAccessibleName(TASK_SETUP_BEAM_CENTER) # block.setContentsMargins(0, 0, 0, 0) # block.setLayout(QVBoxLayout()) # grp = QWidget() # block.layout().addWidget(grp) # block.layout().addStretch() # grp.setLayout(QGridLayout()) # tbox.addItem(block, "Beam Box") # # setup_tab.layout().addWidget(grp) # # row = grp.layout().rowCount # # if self._pv_shutter is not None: # but = QPushButton("open/close shutter") # but.setCheckable(True) # but.setChecked(1 == self._pv_shutter.get()) # but.clicked.connect(self.toggle_shutter) # self._button_shutter = but # grp.layout().addWidget(but, 0, 0, 1, 2) # self._pv_shutter.add_callback(self.update_shutter_label) # #else: #ZAC: orig. code # # self._picker = bernina_pulse_picker.PickerWidget() # # grp.layout().addWidget(self._picker, 0, 0, 1, 2) # but = QPushButton("clear beam markers") # but.clicked.connect(self.remove_beam_markers) # grp.layout().addWidget(but, 1, 1) # help = QTextBrowser() # grp.layout().addWidget(help, 2, 0, 5, 2) # help.setHtml( # """ #

Updating Beam Mark

#
    #
  1. clear beam markers
  2. #
  3. for each zoom level, CTRL-Click (left mouse button) the center of the beam
  4. #
# # """ # ) # # -------- group regions -------- # block = QWidget() # block.setAccessibleName(TASK_SETUP_ROI) # block.setContentsMargins(0, 0, 0, 0) # block.setLayout(QVBoxLayout()) # grp = QWidget() # block.layout().addWidget(grp) # block.layout().addStretch() # grp.setLayout(QGridLayout()) # tbox.addItem(block, "Regions of Interest") # but = QPushButton("add RECT roi") # but.clicked.connect(self.roi_add_rect) # grp.layout().addWidget(but, 0, 0) # but = QPushButton("add LINE roi") # but.clicked.connect(self.roi_add_line) # grp.layout().addWidget(but, 0, 1) # # # -------- group camera settings -------- # block = QWidget() # block.setAccessibleName(TASK_SETUP_CAMERA) # block.setContentsMargins(0, 0, 0, 0) # block.setLayout(QVBoxLayout()) # grp = QWidget() # block.layout().addWidget(grp) # block.layout().addStretch() # grp.setLayout(QGridLayout()) # tbox.addItem(block, "Camera Settings") # but = QPushButton("remove transformations") # but.clicked.connect(lambda bogus: self.modify_camera_transform("remove_all")) # grp.layout().addWidget(but, 0, 0) # but = QPushButton("remove last") # but.clicked.connect(lambda bogus: self.modify_camera_transform("undo_last")) # grp.layout().addWidget(but, 0, 1) # but = QPushButton("Turn 90 CCW") # #but.clicked.connect( # # lambda bogus, n=camera.Transforms.ROTATE_90: self.modify_camera_transform(n) # #) #ZAC: orig. code # grp.layout().addWidget(but, 1, 0) # but = QPushButton("Turn 90 CW") # #but.clicked.connect(lambda bogus, n=camera.Transforms.ROTATE_270: self.modify_camera_transform(n)) #ZAC: orig. code # grp.layout().addWidget(but) # but = QPushButton("Turn 180 CCW") # #but.clicked.connect(lambda bogus, n=camera.Transforms.ROTATE_180: self.modify_camera_transform(n)) #ZAC: orig. code # grp.layout().addWidget(but) # but = QPushButton("Transpose") # #but.clicked.connect(lambda bogus, n=camera.Transforms.TRANSPOSE: self.modify_camera_transform(n)) #ZAC: orig. code # grp.layout().addWidget(but) # but = QPushButton("Flip L/R") # #but.clicked.connect(lambda bogus, n=camera.Transforms.FLIP_LR: self.modify_camera_transform(n)) #ZAC: orig. code # grp.layout().addWidget(but) # but = QPushButton("Flip U/D") # #but.clicked.connect(lambda bogus, n=camera.Transforms.FLIP_UD: self.modify_camera_transform(n)) #ZAC: orig. code # grp.layout().addWidget(but) # row = grp.layout().rowCount() # self._label_transforms = QLabel() # self._label_transforms.setWordWrap(True) # self.modify_camera_transform(None) # # grp.layout().addWidget(self._label_transforms, row, 0, 1, 2) def build_tab_module_fix_target(self): #tab = self._tab_daq_method_prelocated try: objLst=self._goTracked.childItems() except AttributeError: _log.debug('NOT YET INITIALIZED') objLst=None self._moduleFixTarget =mft = ModuleFixTarget.WndFixTarget(self,objLst) tab = self._tabs_daq_methods.insertTab(0,mft,'Fix Target') mft.setAccessibleName(TASK_FIX_TARGET) self._tabs_daq_methods.setCurrentWidget(mft) #set this as the active tabs mft._cbType.addItems(["Fiducial", "FixTarget(12.5x12.5)", "FixTarget(23.0x23.0)", "FixTarget(test)", "Grid()", "SwissMX()", "SwissFEL()"]) mft._btnAdd.clicked.connect(self.module_fix_target_add_obj) mft._btnDelAll.clicked.connect(self.module_fix_target_del_all_obj) mft._btnFit.clicked.connect(self.module_fix_target_fit_fiducial) #tab.layout().addWidget(mft) mft.prefixSelected.connect(lambda prefix: self._le_prefix.setText(prefix)) #mft.dataFileLoaded.connect(self._OLD_daq_method_prelocated_update_markers) #mft.prelocatedDataUpdated.connect(self._OLD_daq_method_prelocated_update_markers) #mft.markersDeleted.connect(self._OLD_daq_method_prelocated_remove_markers) #mft.moveFastStageRequest.connect(self._OLD_move_fast_stage) #self.fiducialPositionSelected.connect(self._OLD_daq_method_prelocated_set_fiducial) #self.appendPrelocatedPosition.connect(self._OLD_daq_method_prelocated_append_data) #self._preloc_inspect_area = QPlainTextEdit() #tab.layout().addWidget(self._preloc_inspect_area) def cb_switch_task(self, index=0): stack = self._wd_stack task = self._wd_tabs_left.currentWidget().accessibleName() setup_task = self._setup_toolbox.currentWidget().accessibleName() method = self._tabs_daq_methods.currentWidget().accessibleName() _log.debug("Task switching: top_tabs={} exp.method={} setup_task={}".format(task, method, setup_task)) if task == "task_experiment": stack.setCurrentIndex(0) active_task = method elif task == "task_setup": active_task = setup_task elif task == "task_sample_selection": active_task = TASK_SAMPLE_SELECTION stack.setCurrentIndex(1) else: _log.error("BUG: un-handled task") QMessageBox.warning(self, "BUG unhandled task!", "unhandled task: top_tabs={} method={}".format(task, method),) self.set_active_task(active_task) self._status_task.setText(active_task) def cb_update_pix2pos(self, calib): app=QApplication.instance() if calib: _log.info("received new pix2pos calibration") QMessageBox.information(self,'Pixel to position calibration','''\ - for each zoom level: - choose a good fiducial mark - move the stage x/y at min. 3 different locations - click (left mouse button) on that fiducial mark - un-click button "Pixel to position calibration" ''') app._raw_pix2pos=dict() else: s=str(app._raw_pix2pos).replace('),','),\n ').replace('],','],\n') print(s) res=QMessageBox.question(self, "calibration", f"use calibration\n{s} ? ") if res==QMessageBox.Yes: geo=app._geometry geo.update_pix2pos(app._raw_pix2pos) app._cfg.setValue(AppCfg.GEO_PIX2POS, geo._lut_pix2pos) del app._raw_pix2pos def cb_update_opt_ctr(self, calib): app=QApplication.instance() if calib: _log.info("received new opt_ctr calibration") QMessageBox.information(self,'Optical center calibration','''\ - choose the biggest zoom level to start - choose at least 3 fiducial marks - for at least two zoom level: - Click (left mouse button) on the fiducial marks (always in the same order) - un-click button "Optical center calibration" ''') app._raw_opt_ctr=dict() else: geo=app._geometry ocOld=geo._opt_ctr try: geo.update_optical_center(app._raw_opt_ctr,True) except BaseException as e: _log.warning(e) QMessageBox.warning(self, "calibration", f"rejected\n{e}") del app._raw_opt_ctr return del app._raw_opt_ctr go=self._goOptCtr oc_sz=go.size() go.blockSignals(True) # avoid to call cb_marker_moved go.setPos(geo._opt_ctr-oc_sz/2) res=QMessageBox.question(self, "calibration", f"keep new optical center\n{geo._opt_ctr} ? ") if res==QMessageBox.Yes: app._cfg.setValue(AppCfg.GEO_OPT_CTR, geo._opt_ctr) else: geo._opt_ctr=ocOld go.setPos(geo._opt_ctr-oc_sz/2) go.blockSignals(False) # allow to call cb_marker_moved def cb_fiducial_update_z(self,obj,*vargs): obj._z=self.tweakers["base_z"].get_rbv() def module_fix_target_add_obj(self,*args,**kwargs): mft=self._moduleFixTarget fx=self.tweakers["fast_x"].get_rbv() fy=self.tweakers["fast_y"].get_rbv() bz=self.tweakers["base_z"].get_rbv() app=QApplication.instance() geo=app._geometry oc=geo._opt_ctr bm_pos=self._goBeamMarker.pos() bm_sz=self._goBeamMarker.size() idx=mft._cbType.currentIndex() param=mft._txtParam.text() #mft._cbType.addItems(["Fiducial", "FixTarget(12.5x12.5)", "FixTarget(23.0x23.0)", "FixTarget()", "Grid()", "SwissMX-path"]) #bm_pos_eu=self._goBeamMarker._pos_eu #bm_size_eu=self._goBeamMarker._size_eu try: #parse the parameters: 'key:value [,key:value]' # as key value separator : and = are allowed #examples: #Fiducial: # no param #FixTarget(12.5x12.5), FixTarget(23.0x23.0), FixTarget(test): # code_gen:2 # code_gen=3 | tmove=10 | twait=30 #Grid(): # size:(30, 20) # cnt:(30, 22) # fiducialSize:0.1 #SwissMX(): # ofs:(.2, .2) # width:10 # fidScl:.02 # fiducial:((.1, .1), (.1, 2.7), (10.3, .1), (10.3, 2.7)) #SwissFEL(): # ofs:(.2, .2) # width:10 # fidScl:.02 # fiducial:((.1, .1), (.1, 2.2), (10.3, .1), (10.3, 2.2)) pLst=param.split('|') pStr=list() for p in pLst: if not p: continue m=re.match('\s*(.*?)\s*[=:]\s*(.*)\s*', p) k, v=m.groups() v=v.replace('(', '[').replace(')', ']') pStr.append(f'"{k}":{v}') pStr='{'+','.join(pStr)+'}' param=json.loads(pStr) # "ofs":[10, 5],"width":200,"fidScl":0.5,"fiducial":[[18,7],[25,16],[70, 20]] except json.decoder.JSONDecodeError as e: _log.error(f'{e}:{param}') param=dict() if idx==0: #go=UsrGO.Fiducial(bm_pos+bm_sz/2-(20, 20), (40, 40),(fx,fy,bz)) l=.120 go=UsrGO.Fiducial((fx-l/2,fy-l/2), (l, l),bz) go.sigRegionChangeFinished.connect(self.cb_fiducial_update_z) elif idx==1: v=geo.pos2pix((12.5, 0)) l=np.linalg.norm(v) l=12.5 #ctr=bm_pos+bm_sz/2 go=UsrGO.FixTargetFrame((fx-l/2,fy-l/2), (l, l), tpl='12.5x12.5',**param) elif idx==2: v=geo.pos2pix((23, 0)) l=np.linalg.norm(v) l=23 go=UsrGO.FixTargetFrame((fx-l/2,fy-l/2), (l, l), tpl='23.0x23.0',**param) elif idx==3: w,h=(.120*12, .120*8) go=UsrGO.FixTargetFrame((fx-w/2,fy-h/2), (w, h), tpl='test',**param) elif idx==4: w,h=size=param.pop('size',(30, 20)) cnt=param.pop('cnt',(30, 22)) fiducialSize=param.pop('fiducialSize',.1) go=UsrGO.Grid((fx-w/2,fy-h/2), size, cnt, fiducialSize,**param) elif idx==5: ofs=param.pop('ofs',(.2, .2)) width=param.pop('width',10) fidScl=param.pop('fidScl',.02) fiducial=param.pop('fiducial',((.1, .1), (.1, 2.7), (10.3, .1), (10.3, 2.7))) fiducial=np.array(fiducial) gp=psi_device.shapepath.GenPath(); gp.swissmx(ofs=ofs, width=width, flipy=True) sz=gp.points.max(0)+np.array(ofs)*2 go=UsrGO.Path((fx-sz[0]/2, fy-sz[1]/2), sz, gp.points, fiducial, fidScl,**param) elif idx==6: ofs=param.pop('ofs',(.2, .2)) width=param.pop('width',10) fidScl=param.pop('fidScl',.02) fiducial=param.pop('fiducial',((.1, .1), (.1, 2.2), (10.3, .1), (10.3, 2.2))) fiducial=np.array(fiducial) gp=psi_device.shapepath.GenPath(); gp.swissfel(ofs=ofs, width=width,flipy=True) sz=gp.points.max(0)+np.array(ofs)*2 go=UsrGO.Path((fx-sz[0]/2, fy-sz[1]/2), sz, gp.points, fiducial, fidScl,**param) else: _log.error('index not handeled') grp=self._goTracked grp.addItem(go) mft._tree.setData(grp.childItems()) def module_fix_target_del_all_obj(self): app=QApplication.instance() geo=app._geometry try: #remove plane fitter if it esists del geo._fitPlane except AttributeError: pass mft=self._moduleFixTarget vb=self.vb grp=self._goTracked for go in grp.childItems(): vb.removeItem(go) grp.setFlag(grp.ItemHasNoContents) mft._tree.setData(None) def module_fix_target_fit_fiducial(self): mft=self._moduleFixTarget vb=self.vb grp=self._goTracked cldLst=grp.childItems() n=0 for go in cldLst: if type(go)==UsrGO.Fiducial: n+=1 ptFitTrf=np.ndarray((4,2)) # 4 (x,y) points to fit a transformation of a parallelogram ptFitPlane=np.ndarray((n,3)) #n (x,y,z) points to fit a plane transformation z=ax+by+c i=j=0 for go in reversed(cldLst): if type(go)==UsrGO.FixTargetFrame: if j==4: #fid=np.array(((0.1, 0.1), (0.1, 1.1), (1.1, 0.1), (1.1, 1.1))) fid=np.array(go._dscr['fiducial']['pos']) sz=np.array(go._dscr['size']) fid=fid/sz trf=geometry.geometry.least_square_trf(ptFitTrf,fid) #_log.debug(f'{trf}') _log.debug('\n trf='+str(trf).replace('\n','\n ')) tr=go.transform() tr.setMatrix(1, trf[1,0]/trf[0,0], 0, trf[0,1]/trf[1,1], 1, 0, 0, 0, 1) go.setTransform(tr) go.setPos(trf[:,2]) #go.setSize((1,1)) sz=(trf[0,0],trf[1,1]) go.setSize(sz) j=0 elif type(go)==UsrGO.Path: if j==4: fid=go._fiducial sz=go.szOrig fid=fid/sz trf=geometry.geometry.least_square_trf(ptFitTrf, fid) _log.debug('\n trf='+str(trf).replace('\n','\n ')) tr=go.transform() tr.setMatrix(1, trf[1, 0]/trf[0, 0], 0, trf[0, 1]/trf[1, 1], 1, 0, 0, 0, 1) go.setTransform(tr) go.setPos(trf[:, 2]) # go.setSize((1,1)) sz=(trf[0, 0], trf[1, 1]) go.setSize(sz) j=0 elif type(go)==UsrGO.Fiducial and go.size()[0]==0.12: if j=3: geo.least_square_plane(ptFitPlane) def cb_progress(self,i,sz,dlg): dlg.setValue(int(i*90/sz)) if dlg.wasCanceled(): raise AttributeError('canceled') def daq_collect(self, **kwargs):# points, visualizer_method, visualizer_params): ''' kwargs: code_gen: 0 pvt motion using ptsTrf 1 pvt motion using trf and points 2 pvt motion, compact grid code 3 pvt motion, compact grid code move and wait tmove: time to move twait: time to wait grid: grid dictionary with orig, pitch, count trf: transformation matrix points: list of points without transformation pts_trf: list of points with transformation ''' app = QApplication.instance() cfg = app._cfg dt_misc = cfg.value(AppCfg.DT_MISC) #_log.critical('abort for test');return # RET geo = app._geometry shutter=app._shutter fn='/tmp/shapepath' try: dt=app._deltatau except AttributeError: app._deltatau=dt=psi_device.Deltatau(app._args.sim&0x100!=0) try: jf=app._jungfrau except AttributeError: app._jungfrau=jf=psi_device.Jungfrau(app._args.sim&0x80!=0) jf.config(**kwargs) sp=dt._shapepath sp.verbose=dt_misc['verbose'] sp.meta['fel_per']=dt_misc['fel_per'] sp.meta['sync_mode']=dt_misc['sync_mode'] sp.meta['sync_flag']=dt_misc['sync_flag'] #pv-monitor-func: stop monitors if not cfg.value(AppCfg.GBL_MISC)['live_on_collect']: pv_mon_lst=app._pv_mon_lst for pv in pv_mon_lst: pv.auto_monitor=False with pg.ProgressDialog('Progress', 0, 100) as dlg: dlg.setWindowModality(Qt.WindowModal) #dlg.setRange(0, 0) #dlg.setCancelButtonText("Abort Acquisition") #dlg.canceled.connect(self.complete_daq) #dlg.setAutoClose(True) #dlg.show() dlg.setLabelText("Setup Gather/Sync");dlg+=5 code_gen=kwargs.get('code_gen',0) sp.setup_sync(verbose=sp.verbose&0x40, timeOfs=dt_misc['time_ofs'], timeCor=dt_misc['time_cor']) dlg.setLabelText("Download motion program");dlg+=5 if code_gen==0: sp.setup_motion(fnPrg=fn+'.prg', mode=3, scale=1., dwell=10, points=kwargs['pts_trf']) elif code_gen==1: sp.setup_motion(fnPrg=fn+'.prg', mode=3, scale=1., dwell=10, points=kwargs['points'],trf=kwargs['trf']) elif code_gen==2: sp.setup_motion(fnPrg=fn+'.prg', mode=4, scale=1., dwell=10, grid=kwargs['grid'], trf=kwargs['trf']) elif code_gen==3: sp.setup_motion(fnPrg=fn+'.prg', mode=5, dwell=10, tmove=kwargs['tmove'] ,twait=kwargs['twait'], grid=kwargs['grid'],trf=kwargs['trf']) sp.setup_gather() try: p=geo._fitPlane # TODO: Cleanup if code_gen==0: # X has inverted sign ! # Z is in um -> therefore the offset must be multiplied with 1000 ! Z motor has opposite sign ! #cz=f'{+p[0]:+.18g}X{-p[1]:+.18g}Y{-p[2]*1000:+.18g}' t=p*np.array((1,1,1000)) cz=f'{t[0]:+.18g}X{t[1]:+.18g}Y{t[2]:+.18g}' else: trf=kwargs['trf'] # grid-coord -> motor-um #(0,0,1)*trf # trf*'gridpos in um' -> motor pos in mm trf2=np.asmatrix(np.identity(3)) trf2[:, :2]=trf #p1=(0,0,1)*trf2 # =matrix([[-2376.8, 1376.8, 1. ]]) um trf3=np.matrix( ((p[0],0,0),(p[1],0,0),(1000*p[2],0,1)) ) #(0, 0, 1)*trf2*trf3 -> z in um of gridpos(0,0) t=(trf2*trf3)[:,0].A.ravel() cz=f'{t[0]:+.18g}X{t[1]:+.18g}Y{t[2]:+.18g}' except AttributeError: cz='0' _log.warning('no plane fitting done. z does not move') trf=kwargs['trf'] if code_gen==0: fx='X';fy='Y' else: fx=f'{trf[0,0]:+.18g}X{trf[1,0]:+.18g}Y{trf[2,0]:+.18g}' fy=f'{trf[0,1]:+.18g}X{trf[1,1]:+.18g}Y{trf[2,1]:+.18g}' if _log.level==logging.DEBUG: try: t except NameError: t=(np.nan,np.nan,np.nan) fn='/tmp/coord.log' _log.debug(f'write all coordinates to {fn}') if code_gen==0: xy1=np.hstack((kwargs["pts_trf"],np.ones((kwargs["pts_trf"].shape[0],1)))) else: xy1=np.hstack((kwargs["points"],np.ones((kwargs["points"].shape[0],1)))) pcz=np.matrix(((t[0],t[1],t[2])))*xy1.T x_y_fx_fy_cz=np.vstack((kwargs["points"].T, kwargs["pts_trf"].T, pcz)).A with open('/tmp/coord.log','w') as fh: fh.write(' x y fx fy cz\n') for i in range(x_y_fx_fy_cz.shape[1]): fh.write('%9.7g %9.7g %9.7g %9.7g %9.7g\n'%tuple(x_y_fx_fy_cz[:,i])) sp.setup_coord_trf(fx, fy, cz) # reset to shape path system comm=sp.comm if comm is None: _log.info('simulated') else: comm.gpascii._cb_func=lambda i, sz:self.cb_progress(i, sz, dlg) #_log.critical('TEMPORARY DO NOT EXECUTE PROGRAM !!!');return if dlg.wasCanceled(): # pv-monitor-func: start monitors if not cfg.value(AppCfg.GBL_MISC)['live_on_collect']: for pv in pv_mon_lst: pv.auto_monitor=True return dlg.setLabelText("Homing and get ready");dlg+=5 sp.homing() # homing if needed sp.run() # start motion program sp.wait_armed() # wait until motors are at first position shutter.open() time.sleep(1.1) num_pts=kwargs['num_pts'] jf.acquire(num_pts) sp.trigger(1.0) # send a start trigger (if needed) after given time _log.info('start trigger sent') if dt._comm is None: dlg.setLabelText("run motion/acquisition (simulated)") dlg.setMaximum(num_pts) for p in range(0,num_pts,1+int(num_pts/100)): #_log.info(f'progress {p}/{num_pts}') dlg.setValue(p) time.sleep(.1) else: dlg.setLabelText("run motion/acquisition") dlg.setMaximum(num_pts) while True: p=int(sp.progress()) _log.info(f'progress {p}/{num_pts}') if p<0: break elif dlg.wasCanceled(): dt._comm.gpascii.send_block('&1a;Gather.Enable=0') break #_log.info(f'progress {p}/{sp.points.shape[0]}') dlg.setValue(max(p,num_pts)) time.sleep(1.) #wait 1 sec instead of .1...hopefully less segmentation faults if not dlg.wasCanceled(): jf.gather_upload() dlg.setLabelText("upload gather data") sp.gather_upload(fnRec=fn+'.npz') if dt_misc['show_plots']: dp=psi_device.shapepath.DebugPlot(sp) dp.plot_gather(mode=11) plt.show(block=False) #plt.show(block=True) _log.info('Shutter closed') shutter.close() # pv-monitor-func: start monitors if not cfg.value(AppCfg.GBL_MISC)['live_on_collect']: for pv in pv_mon_lst: pv.auto_monitor=True def esc_run_steps(self, steps, title, esc_state): self._esc_state ="busy" with pg.ProgressDialog(title, 0, len(steps)) as dlg: for step in steps: step() dlg += 1 if dlg.wasCanceled(): QMessageBox.warning(self, "escape steps", "ABORTED" + title) break self._esc_state = esc_state def move_gonio(self, pos): # gonio="{'pos_mount': (0.0,0.0,0.0,0.0), 'pos_align': (0.0,0.0,0.0,0.0)}" # positions are: fast_x,fast_y,base_x,base_z,(omega=0,det_z, no change) app=QApplication.instance() cfg=app._cfg pos_gonio = cfg.value(AppCfg.DFT_POS_GONIO) if pos_gonio is None: msg="gonio default positions are not configured." _log.warning(msg) QMessageBox.warning(self, "gonio not configured", msg) return tw_fx = self.tweakers["fast_x"] tw_fy = self.tweakers["fast_y"] tw_cx = self.tweakers["base_x"] tw_cy = self.tweakers["base_y"] tw_cz = self.tweakers["base_z"] try: p_fx, p_fy, p_cx, p_cz,=pos_gonio['pos_'+pos] except KeyError: raise ValueError("Goniometer position *{}* is not known!!") _log.info(f"moving goniometer {pos} to fx:{p_fx:.5g},fy:{p_fy:.5g},cx:{p_cx:.5g},cz_{p_cz:.5g}") #,ry:{p_ry:.5g}") tw_fx.move_abs(p_fx) tw_fy.move_abs(p_fy) tw_cx.move_abs(p_cx) tw_cz.move_abs(p_cz) app_utils.assert_tweaker_positions([ (tw_fx, p_fx, 0.1), (tw_fy, p_fy, 0.1), (tw_cx, p_cx, 0.1), (tw_cz, p_cz, 0.1), ], timeout=20.0, ) def move_post_tube(self, pos): # post_sample_tube="{'x_up': -0.2, 'y_up': 0.0, 'x_down': 0.0, 'y_down': 0.0, 'x_out_delta': 0.0, 'y_out_delta': 0.0, 'z_in': 0.0, 'z_out': 0.0}" app=QApplication.instance() cfg=app._cfg pos_pst = cfg.value(AppCfg.DFT_POS_PST) if pos_pst is None: msg = "SwissMX *POST-SAMPLE-TUBE* configuration is incomplete!!!" _log.warning(msg) QMessageBox.warning(self, "post tube not configured", msg) return x_in_us=pos_pst['x_in_us'] y_in_us=pos_pst['y_in_us'] x_in_ds=pos_pst['x_in_ds'] y_in_ds=pos_pst['y_in_ds'] x_out_d=pos_pst['x_out_delta'] y_out_d=pos_pst['y_out_delta'] z_in =pos_pst['z_in'] z_out =pos_pst['z_out'] tw_x_us = self.tweakers["tube_usx"] tw_y_us = self.tweakers["tube_usy"] tw_x_ds = self.tweakers["tube_dsx"] tw_y_ds = self.tweakers["tube_dsy"] tw_z = self.tweakers["tube_z"] tandem_twv = float(self._post_tandem_tweak_val.text()) _log.info(f"move post sample tube {pos}") if pos == "in": tw_x_us.move_abs(x_in_us) tw_y_us.move_abs(y_in_us) tw_x_ds.move_abs(x_in_ds) tw_y_ds.move_abs(y_in_ds) try: app_utils.assert_tweaker_positions([ (tw_x_us, x_in_us, 0.1), (tw_y_us, y_in_us, 0.1), (tw_x_ds, x_in_ds, 0.1), (tw_y_ds, y_in_ds, 0.1), ],timeout=20.0, ) except app_utils.PositionsNotReached as e: msg=f"failed to move post sample tube {pos}: {e}" _log.warning(msg) QMessageBox.warning(self, "failed to move post sample tube", msg,) raise tw_z.move_abs(z_in, wait=True) try: app_utils.assert_tweaker_positions([(tw_z, z_in, 0.1)]) except app_utils.PositionsNotReached as e: msg=f"failed to move post sample tube {pos}: {e}" _log.warning(msg) QMessageBox.warning(self, "failed to move post sample tube", msg,) raise elif pos == "out": tw_z.move_abs(z_out, wait=True) try: app_utils.assert_tweaker_positions([(tw_z, z_out, 0.1)]) except app_utils.PositionsNotReached as e: msg=f"failed to move post sample tube {pos}: {e}" _log.warning(msg) QMessageBox.warning(self, "failed to move post sample tube", msg,) raise x_out_us=x_in_us+x_out_d; tw_x_us.move_abs(x_out_us) y_out_us=y_in_us+y_out_d; tw_y_us.move_abs(y_out_us) x_out_ds=x_in_ds+x_out_d; tw_x_ds.move_abs(x_out_ds) y_out_ds=y_in_ds+y_out_d; tw_y_ds.move_abs(y_out_ds) try: app_utils.assert_tweaker_positions([ (tw_x_us, x_out_us, 0.1), (tw_y_us, y_out_us, 0.1), (tw_x_ds, x_out_ds, 0.1), (tw_y_ds, y_out_ds, 0.1), ], timeout=20.0, ) except app_utils.PositionsNotReached as e: msg=f"failed to move post sample tube {pos}: {e}" _log.warning(msg) QMessageBox.warning(self, "failed to move post sample tube", msg,) raise elif pos == "x+": tw_x_us.move_rel(tandem_twv) tw_x_ds.move_rel(tandem_twv) elif pos == "x-": tw_x_us.move_rel(-tandem_twv) tw_x_ds.move_rel(-tandem_twv) elif pos == "y+": tw_y_us.move_rel(tandem_twv) tw_y_ds.move_rel(tandem_twv) elif pos == "y-": tw_y_us.move_rel(-tandem_twv) tw_y_ds.move_rel(-tandem_twv) else: raise ValueError("post sample tube *{}* is not known!!") def move_detector(self, pos): # detector="{'pos_in': -0.1, 'pos_out': 0.0}" app=QApplication.instance() cfg=app._cfg pos_det = cfg.value(AppCfg.DFT_POS_DET) if pos_det is None: msg="detector default positions are not configured." _log.warning(msg) QMessageBox.warning(self, "post tube not configured", msg) return det_z = self.tweakers["det_z"] try: p_z=pos_det['pos_'+pos] except KeyError: raise ValueError("Collimator position *{}* is not known!!") _log.info(f"moving detector {pos} to {p_z:.5g}") det_z.move_abs(p_z, assert_position=True) def move_collimator(self, pos): # collimator="{'x_in': 0.0, 'y_in': -0.1, 'x_out_delta': 0.0, 'y_out_delta': 0.0}" app=QApplication.instance() cfg=app._cfg cx = self.tweakers["colli_x"] cy = self.tweakers["colli_y"] pos_col = cfg.value(AppCfg.DFT_POS_COL) if pos_col is None: msg="COLLIMATOR configuration is incomplete!" _log.warning(msg) QMessageBox.warning(self, "post tube not configured", msg) return x_in = pos_col['x_in'] y_in = pos_col['y_in'] x_out = pos_col['x_out'] y_out = pos_col['y_out'] if pos == "out": x_pos=x_out y_pos=y_out elif pos == "in": x_pos=x_in y_pos=y_in elif pos == "ready": x_pos=x_in y_pos=y_out else: raise ValueError("Collimator position *{}* is not known!!") _log.info(f"moving collimator {pos} to X,Y = {x_pos:.3f}, {y_pos:.3f}") cx.move_abs(x_pos, assert_position=True) cy.move_abs(y_pos, assert_position=True) if __name__=="__main__": def main(): #_log.info(f'Version: pyqtgraph:{pg.__version__} matplotlib:{mpl.__version__} numpy:{np.__version__} epics:{epics.__version__} qt:{QT_VERSION_STR}' ) _log.info(f'Version: pyqtgraph:{pg.__version__} epics:{epics.__version__} qt:{QT_VERSION_STR}' ) import argparse, socket hostname=socket.gethostname() if hostname=='ganymede': #use EPICS locally #os.environ['EPICS_CA_ADDR_LIST']='localhost' #use EPICS if connected to ESC network os.environ['EPICS_CA_ADDR_LIST'] ='129.129.244.255 sf-saresc-cagw.psi.ch:5062 sf-saresc-cagw.psi.ch:5066' #(h, t)=os.path.split(sys.argv[0]);cmd='\n '+(t if len(h)>20 else sys.argv[0])+' ' #exampleCmd=('', '-m0xf -v0') epilog=__doc__ #+'\nExamples:'+''.join(map(lambda s:cmd+s, exampleCmd))+'\n' parser=argparse.ArgumentParser(epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument('-l', '--log', type=int, help='debug(1)|info(2)|warn(3)|error(4) default=%(default)d', default=2) parser.add_argument("--sim", "-s", type=lambda x: int(x,0), help="simulate devices (see bitmasks) default=0x%(default)x", default=0) args = parser.parse_args() _log.info('Arguments:{}'.format(args.__dict__)) if args.log!=2: for l in ('app_config', 'app_utils', 'backlight', 'camera', 'epics_widgets', 'geometry', 'illumination', 'ModuleFixTarget', 'MXMotion', 'pbtools', 'psi_device', 'pyqtUsrObj', 'shapepath', 'swissmx', 'zoom',): try: logging.getLogger(l).setLevel(args.log*10) except BaseException as e: _log.error(f'failed set log level ({l}): {e}') if args.log<2: listLoggers() from PyQt5.QtWidgets import QApplication # set app icon app = QApplication(sys.argv) app_icon = QtGui.QIcon() app_icon.addFile("artwork/logo/16x16.png", QtCore.QSize(16, 16)) app_icon.addFile("artwork/logo/24x24.png", QtCore.QSize(24, 24)) app_icon.addFile("artwork/logo/32x32.png", QtCore.QSize(32, 32)) app_icon.addFile("artwork/logo/48x48.png", QtCore.QSize(48, 48)) app_icon.addFile("artwork/logo/256x256.png", QtCore.QSize(256, 256)) app.setWindowIcon(app_icon) startupWin=StartupSplash() #for i in range(1, 20): # startupWin.set(i,f'sample startup text {i}') # #app.processEvents() # t = time.time() # #while time.time() < t + 0.1: # # app.processEvents() #app._settings=QSettings("PSI", "SwissMX") app._args=args app._pv_mon_lst=list() #pv-monitor-func: list of all pv that are monitored and that should # be turned on/off during data collection to avoid (hopefully) segmentation fault startupWin.set(5, f'load settings') app._cfg=cfg=AppCfg() app._geometry=geometry.geometry() app._geometry._lut_pix2pos=cfg.value(cfg.GEO_PIX2POS) app._geometry._opt_ctr=cfg.value(cfg.GEO_OPT_CTR) startupWin.set(15, f'connect backlight') pfx=app._cfg.value(AppCfg.GBL_DEV_PREFIX)[0] dft_pos_bklgt=app._cfg.value(AppCfg.DFT_POS_BKLGT) if args.sim&0x01: app._backlight = backlight.Backlight(None,dft_pos_bklgt) else: app._backlight = backlight.Backlight(f'{pfx}:MOT_BLGT',dft_pos_bklgt) startupWin.set(20, f'connect illumination') if args.sim&0x02: app._illumination = illumination.IlluminationControl(None) else: app._illumination = illumination.IlluminationControl() startupWin.set(40, f'connect zoom') if args.sim&0x04: app._zoom = zoom.QopticZoom(None) else: app._zoom = zoom.QopticZoom() startupWin.set(60, f'connect camera') if args.sim&0x08: app._camera = camera.epics_cam(None) app._camera.sim_gen(mode=1) else: app._camera = camera.epics_cam() if args.sim&0x80: app._shutter = psi_device.Shutter(0) else: app._shutter = psi_device.Shutter(1) #app._camera.run() is called in center_piece_update startupWin.set(60, f'start main window') app._mainWnd=wnd=WndSwissMx() wnd.show() startupWin._wnd.finish(wnd) # needed so pycharm can restart application signal.signal(signal.SIGINT, sigint_handler) #main.update_user_and_storage() #ZAC: orig. code sys.exit(app.exec_()) main()