#!/usr/bin/env python3 import json import random import re import signal import threading from epics.ca import pend_event from matplotlib import pyplot import Wigis import app_utils import jungfrau_widget import bernina_pulse_picker import qutilities import storage from app_config import settings, appsconf, option, toggle_option, simulated # publisher = pubber.Publisher() from zmq_escape import zescape from helicalscan import HelicalScanGui SKIP_ESCAPE_TRANSITIONS_IF_SAFE = "escape/skip_transitions_if_safe" BEAM_MARKER_POSITIONS = "beam/marker_positions" BEAM_SIZE = "beam/size" CRYOJET_MOTION_ENABLED = "cryojet/motion_enabled" CRYOJET_NOZZLE_OUT = "cryojet/nozzle_out" CRYOJET_NOZZLE_IN = "cryojet/nozzle_in" DELTATAU_SHOW_PLOTS = "deltatau/show_plots" DELTATAU_OMEGACOS = "deltatau/omegacos" DELTATAU_SORT_POINTS = "deltatau/sort_points" DELTATAU_VELOCITY_SCALE = "deltatau/velocity_scale" CAMERA_TRANSFORMATIONS = "camera/transformations" CAMERA_ZOOM_TO_PPM = "camera/zoom_to_ppm" EXPERIMENT_PGROUP = "experiment/pgroup" EXPERIMENT_UID = "experiment/uid" ACTIVATE_PULSE_PICKER = "scanning/activate_pulse_picker" import logging logging.getLogger("PyQt5.uic").setLevel(logging.CRITICAL) logging.getLogger("requests").setLevel(logging.CRITICAL) logging.getLogger("urllib3").setLevel(logging.CRITICAL) logging.getLogger("paramiko").setLevel(logging.CRITICAL) FORMAT = "%(asctime)-15s - %(levelname)s - %(name)s - %(message)s" logging.basicConfig(format=FORMAT, level=logging.DEBUG) logger = logging.getLogger("swissmx") import PrelocatedCoordinatesModel from EmblModule import EmblWidget from HelicalTable import HelicalTableWidget from Wigis import Spinner, Checkbox import camera import epics import numpy as np from exceptions import * import qoptic from backlight import Backlight backlight = Backlight() import mx_swissfel swissfel = mx_swissfel.SwissFELMachine() import qtawesome from bernina_pulse_picker import pulsePicker from PyQt5 import QtCore, QtGui from PyQt5.QtCore import Qt, pyqtSlot, QSize, QRegExp, pyqtSignal, QObject, QThread from PyQt5.QtGui import QKeySequence, QPixmap, QRegExpValidator from PyQt5.QtWidgets import ( QLabel, QGroupBox, QVBoxLayout, QPushButton, QWidget, QHBoxLayout, QGridLayout, QDoubleSpinBox, QLineEdit, QSpinBox, QFormLayout, QShortcut, QMessageBox, QAction, QSplashScreen, QProgressBar, QSizePolicy, QToolBox, QPlainTextEdit, QTextBrowser, QProgressDialog, QFileDialog, ) from PyQt5.uic import loadUiType from CustomROI import BeamMark, Grid, CrystalCircle from GenericDialog import GenericDialog from dialogs.PreferencesDialog import PreferencesDialog from epics_widgets import zoom from epics_widgets.MotorTweak import MotorTweak from epics_widgets.SmaractMotorTweak import SmaractMotorTweak _URL = "http://PC12288:8080" # import TellClient # tell = TellClient.TellClient(_URL) import eventer from detector_control import jungfrau_detector from findxtal import findObj import pyqtgraph as pg import pyqtgraph.exporters if simulated: logger.warning("simulation mode enabled") qoptic_zoom = qoptic.FeturaClientBogus() else: qoptic_zoom = qoptic.FeturaClient() pg.setConfigOption("antialias", True) # from epics_widgets import MotorTweak Ui_MainWindow, QMainWindow = loadUiType("swissmx.ui") user = getpass.getuser() home = os.path.expanduser("~") just_quit = user in ["e10003", "gac-esbmx"] folders = storage.Folders() TASK_JUNGFRAU_SETTINGS = "jungfrau_settings" TASK_SETUP_PPM_CALIBRATION = "ppm_calibration" TASK_SETUP_PPM_CALIBRATION_TBOX = "ppm_calibration_tbox" TASK_SETUP_BEAM_CENTER = "beamcenter_calibration" TASK_SETUP_CAMERA = "setup_camera" TASK_SETUP_ROI = "setup_rois" TASK_SAMPLE_SELECTION = "task_sample_selection" TASK_SCREENING = "screening" TASK_POINT_SHOOT = "point_shoot" TASK_GRID = "grid" TASK_PRELOCATED = "prelocated" TASK_HELICAL = "helical" TASK_EMBL = "embl" from deltatau import DeltaTau, shapepath, helical, DebugPlot delta_tau = DeltaTau() BROKER_SERVER = "127.0.0.1" BROKER_PORT = 61613 # sample_camera = camera.camera_server(basename="SARES20-PROF142-M1") logger.info( f"connecting to microscope to camera server: {appsconf['microscope']['sample_camera']['pv_prefix']} " ) sample_camera = camera.camera_server( basename=appsconf["microscope"]["sample_camera"]["pv_prefix"] ) # sample_camera.set_transformations([camera.Transforms.FLIP_LR, camera.Transforms.ROTATE_270]) def print_transform(t): m11 = t.m11() m12 = t.m12() m13 = t.m13() m21 = t.m21() m22 = t.m22() m23 = t.m23() m31 = t.m31() m32 = t.m32() m33 = t.m33() print("{:20s}{:12.3f} {:12.3f} {:12.3f}".format("Transform", m11, m12, m13)) print("{:20s}{:12.3f} {:12.3f} {:12.3f}".format(" ", m21, m22, m23)) print("{:20s}{:12.3f} {:12.3f} {:12.3f}".format(" ", m31, m32, m33)) def tdstamp(): return time.strftime("%Y%m%dH%H%M%S") def datestamp(): return time.strftime("%Y%m%d") class AcquisitionAbortedException(Exception): pass class Sequencer(QObject): finished = pyqtSignal() timeoutExpired = pyqtSignal() errorHappened = pyqtSignal(str) def __init__(self, steps): super().__init__() self.steps = steps def run_sequence(self): num_steps = len(self.steps) for n, step in enumerate(self.steps): logger.info("runing step {}/{}".format(n, num_steps)) step() logger.info("emiting finished") self.finished.emit() class Main(QMainWindow, Ui_MainWindow): pixelsPerMillimeter = pyqtSignal(float) beamCameraCoordinatesChanged = pyqtSignal(float, float) addGridRequest = pyqtSignal(float, float) zoomChanged = pyqtSignal(float) folderChanged = pyqtSignal(str) prefixChanged = pyqtSignal(str) projectChanged = pyqtSignal(str) gridUpdated = pyqtSignal(int) # index in self._grids gonioMoveRequest = pyqtSignal(float, float, float, float, float) fast_x_position = pyqtSignal(float) fast_y_position = pyqtSignal(float) fast_dx_position = pyqtSignal(float) fast_dy_position = pyqtSignal(float) fiducialPositionSelected = pyqtSignal( float, float, float, float ) # camx, camy, gonx, gony appendPrelocatedPosition = pyqtSignal(float, float, float, float) appendPrelocatedFiducial = pyqtSignal(bool, float, float, float, float) increaseRunNumberRequest = pyqtSignal() daqAborted = pyqtSignal() def __init__(self,): super(Main, self).__init__() self.setupUi(self) if os.getenv("DEVELOPMENT_VERSION"): title = "[DEVELOPMENT] SwissMX - Solid support scanning for FEL sources" else: title = "SwissMX - Solid support scanning for FEL sources" self.setWindowTitle(title) self._do_quit = False qtawesome.load_font( "material", "MaterialIcons-Regular.ttf", "MaterialIcons-Regular.json", "fonts/", ) QtGui.QFontDatabase.addApplicationFont("fonts/Inconsolata-Bold.ttf") QtGui.QFontDatabase.addApplicationFont("fonts/Baloo-Regular.ttf") self._pv_shutter = None # epics.PV('X06SA-ES-MD2:SHUTTER') self._has_pulse_picker = False self._has_camera = True self._at_x06sa = False self._at_cristalina = True self._at_lab_eh060 = False self.init_settings() # self.create_escape_toolbar() self.tweakers = {} self.setup_sliders() self.glw = pg.GraphicsLayoutWidget() self.microscope_page.setLayout(QVBoxLayout()) self.microscope_page.layout().addWidget(self.glw) self.glw.show() self.vb = self.glw.addViewBox(enableMenu=False) self.img = pg.ImageItem() self.img._swissmx_name = "microscope_image_item" # self.graphicsView.setCentralItem(self.vb) self.glw.scene().sigMouseMoved.connect(self.mouse_move_event) self.glw.scene().sigMouseClicked.connect(self.mouse_click_event) self.vb.setAspectLocked() self.vb.setBackgroundColor((120, 90, 90)) self.vb.addItem(self.img) self._escape_current_state = "Maintenance" self._pin_mounting_offset = 0.0 self._mouse_tracking = False self._rois = [] # add a generic ellipse # pen = QtGui.QPen(QColor(Qt.yellow)) # pen.setWidth(5) # ellipse1 = QtWidgets.QGraphicsEllipseItem(650, 700, 500, 300) # ellipse1.setFlag(QGraphicsItem.ItemIsSelectable) # ellipse1.setFlag(QGraphicsItem.ItemIsMovable) # ellipse1.setPen(pen) # self.vb.addItem(ellipse1) # self.vb.addItem(Grid(pos=(650, 700), size=(100, 200), cell_size=(3, 4), parent=self)) # self.vb.addItem(Retangulo(pos=(750, 700), size=(100, 200), pen=pg.fn.mkPen(color=[230, 150, 200], width=4.7))) # self.vb.addItem(pg.PolyLineROI([[80, 60], [90, 30], [60, 40]], pen=(6,9), closed=True)) # test # x2 = np.linspace(-100, 100, 1000) # data2 = np.sin(x2) / x2 # # p8 = self.glw.addPlot(row=1, col=0, title="Region Selection") # plitem = pg.PlotItem() # plitem.plot(data2, pen=(255, 255, 255, 200)) # # lr = pg.LinearRegionItem([400, 700]) # # lr.setZValue(-10) # self.vb.addItem(plitem) self._message_critical_fault = QLabel(None) self._message_critical_fault.setAccessibleName("device_fault") self.statusbar.insertWidget(0, self._message_critical_fault) self._lb_coords = QLabel(None) self._lb_coords.setText("coords") self.statusbar.addPermanentWidget(self._lb_coords) self._fel_status = QLabel(None) self._fel_status.setAccessibleName("fel_status_statusbar") self.statusbar.addPermanentWidget(self._fel_status) swissfel.statusStringUpdated.connect(self._fel_status.setText) self._status_task = QLabel(None) self._status_task.setAccessibleName("status_task_label") self.statusbar.addPermanentWidget(self._status_task) self.add_beam_marker() self._beam_markers = {} # ppm calibration self._zoom_to_ppm = {} self._ppm_click = None self.load_stylesheet() self.folderChanged.connect(lambda t: print("folder now: {}".format(t))) # FIXME self.init_actions() self.prepare_microscope_page() self.prepare_embl_gui() self.prepare_left_tabs() self.update_beam_marker(qoptic_zoom.get_position()) self._centerpiece_stack.setCurrentIndex(0) self._centerpiece_stack.currentChanged.connect(self.center_piece_update) self.init_validators() self.init_settings_tracker() self.wire_storage() self.create_helical_widgets() self.center_piece_update(0) # start camera updater curzoom = qoptic_zoom.get_position(cached=False) logger.debug(f"starting app with zoom at {curzoom}") self.zoom_changed_cb(curzoom) self._tabs_daq_methods.currentChanged.connect(self.switch_task) self.switch_task() def create_helical_widgets(self): tbox = self._helical_tablebox htab = self._helical_scan_table = HelicalTableWidget() htab.gonioMoveRequest.connect(self.move_gonio_to_position) tbox.setLayout(QVBoxLayout()) grp = QWidget() grp.setLayout(QFormLayout()) le = QSpinBox() le.setRange(1, 100) le.setValue(htab.scanHorizontalCount()) le.valueChanged.connect(lambda cnt: htab.setScanHorizontalCount(cnt)) grp.layout().addRow("Horizontal Count", le) le = QSpinBox() le.setRange(1, 100) le.setValue(htab.scanVerticalCount()) le.valueChanged.connect(lambda cnt: htab.setScanVerticalCount(cnt)) grp.layout().addRow("Vertical Count", le) le = QDoubleSpinBox() le.setRange(-180.0, 180.0) le.setSingleStep(5.0) le.setSuffix(" degrees") le.valueChanged.connect(htab.setStartAngle) grp.layout().addRow("Start angle", le) tbox.layout().addWidget(grp) widgi = QWidget() widgi.setLayout(QHBoxLayout()) tbox.layout().addWidget(widgi) but = QPushButton("Add Crystal") but.clicked.connect(htab.add_xtal) widgi.layout().addWidget(but) but = QPushButton("Set START") but.clicked.connect(lambda: htab.set_xtal_start(self.get_gonio_positions())) widgi.layout().addWidget(but) but = QPushButton("Set END") but.clicked.connect(lambda: htab.set_xtal_end(self.get_gonio_positions())) widgi.layout().addWidget(but) tbox.layout().addWidget(htab) def add_beam_marker(self): w, h = settings.value(BEAM_SIZE) self._beammark = BeamMark([100, 100], (int(w), int(h)), parent=self) self.vb.addItem(self._beammark) def camera_pause_toggle(self): sample_camera.pause() def updateImage(self, pause=False): if not sample_camera.is_paused(): img = sample_camera.get_image() if img is not None: self.img.setImage(img) def init_settings_tracker(self): logger.info("configuring widget persistence") fields = { # 'folder': (self._label_folder, str), "project": (self._le_project, str), "prefix": (self._le_prefix, str), "actual_prefix": (self._label_actual_prefix, str), "exposureTime": (self._dsb_exposure_time, float), "oscillationAngle": (self._dsb_oscillation_step, float), "blastRadius": (self._dsb_blast_radius, float), } for key, f_config in fields.items(): widget, conv = f_config value = settings.value(key) try: wset, wget = widget.setText, widget.text logger.debug("tracking text field {}".format(key)) except AttributeError: logger.debug("tracking {} number field {}".format(conv, key)) wset, wget = widget.setValue, lambda fget=widget.value: conv(fget()) except Exception as e: logger.error(e) try: wset(conv(value)) except Exception as e: logger.debug(e) logger.warning( 'failed for "{}" updating field of type {} with {}'.format( key, type(widget), value ) ) finally: # logger.debug('wget = {}; wset = {}'.format(wget, wset)) widget.editingFinished.connect( lambda w=widget, k=key, func_get=wget: self.persist_setting( k, func_get() ) ) self.storage_cascade_prefix(None) def init_validators(self): identifier_regex = QRegExp("[a-z-A-Z_0-9%]+") self._le_project.setValidator( QRegExpValidator(identifier_regex, self._le_project) ) self._le_prefix.setValidator( QRegExpValidator(identifier_regex, self._le_prefix) ) def wire_storage(self): self._le_prefix.textChanged.connect(self.storage_cascade_prefix) self._le_prefix.textChanged.connect( lambda newtext: self.prefixChanged.emit(newtext) ) self._le_project.textChanged.connect(self.storage_cascade_prefix) self._le_project.textChanged.connect( lambda newtext: self.projectChanged.emit(newtext) ) self._le_prefix.editingFinished.connect(self.prepare_daq_folder) self._le_project.editingFinished.connect(self.prepare_daq_folder) self.increaseRunNumberRequest.connect(self.increase_run_number) def storage_cascade_prefix(self, val): prefix = self._le_prefix.text() if 0 == len(prefix): logger.warning("empty prefix is not accepted") self._le_prefix.setAccessibleName("invalid_input") self._le_prefix.blockSignals(True) self._le_prefix.setText("INVALID=>" + prefix) QMessageBox.warning(self, "prefix is not valid", "Prefix is not valid!") self._le_prefix.blockSignals(False) return else: self._le_prefix.setAccessibleName("") project = self._le_project.text() folders.set_prefix(prefix) folders.set_project(project) folders.run = settings.value("run_number", type=int) self._label_runnumber.setText(f"{folders.run:04d}") self._data_folder = folders.raw_folder self.folderChanged.emit(folders.raw_folder) self._label_actual_prefix.setText(folders.prefix) self._label_folder.setText(folders.raw_folder) def increase_run_number(self): run = settings.value("run_number", type=int) run += 1 settings.setValue("run_number", run) folders.run = run self._label_runnumber.setText(f"{run:04d}") def prepare_daq_folder(self): global home, folders prefix = folders.prefix folder = folders.res_folder if 0 == len(prefix): return try: os.makedirs(folder, 0o750, exist_ok=True) except: msg = "Failed to create folder: {}".format(folder) logger.warning(msg) QMessageBox.warning( self, "Screenshot: failed to create folder", "Failed to create output folder for screenshot!\n\n\tScreenshot not taken!", ) raise fname = os.path.join(folders.pgroup_folder, ".latest_raw") try: with open(fname, "w") as f: f.write(folders.raw_folder) logger.info("wrote: {}".format(fname)) except: logger.warning("failed writing {}".format(fname)) fname = os.path.join(folders.pgroup_folder, ".latest_res") try: with open(fname, "w") as f: f.write(folders.res_folder) logger.info("wrote: {}".format(fname)) except: logger.warning("failed writing {}".format(fname)) def persist_setting(self, s, v): logger.debug("persisting {} = {}".format(s, v)) settings.setValue(s, v) def method_changed(self, index): method = self._tabs_daq_methods.currentWidget().accessibleName() logger.info("method now => {}".format(method)) def center_piece_update(self, index): if index > 0: # not showing camera image logger.warning("listening to zescape") self.timer.stop() zescape.start_listening() self.timer = QtCore.QTimer(self) self.timer.timeout.connect(self.check_zescape) self.timer.start(20) else: zescape.stop_listening() logger.warning("re-starting timer on camera update") try: self.timer.stop() except: pass self.timer = QtCore.QTimer(self) self.timer.timeout.connect(self.updateImage) self.timer.start(10) def check_zescape(self): msg = zescape.check() if msg is None: return if "current" in msg: logger.warning(f"current state: {self._escape_current_state}") zescape.reply(self._escape_current_state) elif "goto" in msg: state = msg.split()[1].lower() logger.warning(f"TELL requests to go to {state}") try: if "sampleexchange" in state: logger.debug( f"moving to mount with offset = {self._pin_mounting_offset}" ) self.move_gonio_to_mount_position(offset=self._pin_mounting_offset) elif "samplealignment" in state: self.escape_goToSampleAlignment() except: zescape.reply("Maintenance") zescape.reply(self._escape_current_state) else: # JSON data = json.loads(msg) if "sampleName" in data: logger.debug(f"TELL SAMPLE DATA => {data}") self.tell2storage(data) zescape.reply("ack") elif "pin_offset" in data: logger.debug(f"TELL pin offset => {data}") self._pin_mounting_offset = data["pin_offset"] zescape.reply("ack") elif "get_pin_offset" in data: logger.debug(f"TELL get pin offset => {data}") zescape.reply_json({"pin_offset": self._pin_mounting_offset}) def tell2storage(self, sample): logger.debug(f"2 TELL SAMPLE DATA => {type(sample)}") self._le_prefix.setText(sample["sampleName"]) self._le_project.setText(sample["sampleFolder"]) tstf = folders.get_prefixed_file("_newsample") self.storage_cascade_prefix(None) logger.warning(f"sample info: {tstf}") def switch_task(self, index=0): stack = self._centerpiece_stack task = self._left_tabs.currentWidget().accessibleName() setup_task = self._setup_toolbox.currentWidget().accessibleName() method = self._tabs_daq_methods.currentWidget().accessibleName() logger.debug( "Task switching: top_tabs={} exp.method={} setup_task={}".format( task, method, setup_task ) ) if task == "task_experiment": stack.setCurrentIndex(0) active_task = method elif task == "task_setup": active_task = setup_task elif task == "task_sample_selection": active_task = TASK_SAMPLE_SELECTION stack.setCurrentIndex(1) else: logger.error("BUG: un-handled task") QMessageBox.warning( self, "BUG unhandled task!", "unhandled task: top_tabs={} method={}".format(task, method), ) self.set_active_task(active_task) self._status_task.setText(active_task) def set_active_task(self, task): logger.info("TASK == {}".format(task)) self._active_task = task def active_task(self): return self._active_task def is_task(self, task): return task == self._active_task def get_task_menu(self): pass def toggle_shutter(self, **kwargs): if self._pv_shutter: if 0 == self._pv_shutter.get(): self._pv_shutter.put(1) self._button_shutter.setText("shutter opened\n\u2622") else: self._pv_shutter.put(0) self._button_shutter.setText("shutter closed\n\u2622") elif self._has_pulse_picker: pulsePicker.toggle() def update_shutter_label(self, pvname, value, char_value, **kwargs): if 0 == value: self._button_shutter.setText("shutter closed") else: self._button_shutter.setText("shutter opened") def prepare_left_tabs(self): tabs = self._left_tabs tabs.currentChanged.connect(self.switch_task) setup_tab = self._tab_setup exp_tab = self._tab_experiment # add layouts exp_tab.setLayout(QVBoxLayout()) setup_tab.setLayout(QVBoxLayout()) setup_tab.setSizePolicy( QSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding) ) self._setup_toolbox = tbox = QToolBox() tbox.setSizePolicy(QSizePolicy(QSizePolicy.Expanding, QSizePolicy.Ignored)) # tbox.setMinimumWidth(itemWidget.sizeHint().width()) setup_tab.layout().addWidget(tbox) # Jungfrau parameters block = QWidget() block.setAccessibleName(TASK_JUNGFRAU_SETTINGS) block.setContentsMargins(0, 0, 0, 0) block.setLayout(QVBoxLayout()) self.jungfrau = jungfrau_widget.JungfrauWidget() block.layout().addWidget(self.jungfrau) block.layout().addStretch() tbox.addItem(block, "Jungfrau Parameters") # group beam box block = QWidget() block.setAccessibleName(TASK_SETUP_BEAM_CENTER) block.setContentsMargins(0, 0, 0, 0) block.setLayout(QVBoxLayout()) grp = QWidget() block.layout().addWidget(grp) block.layout().addStretch() grp.setLayout(QGridLayout()) tbox.addItem(block, "Beam Box") # setup_tab.layout().addWidget(grp) row = grp.layout().rowCount if self._pv_shutter is not None: but = QPushButton("open/close shutter") but.setCheckable(True) but.setChecked(1 == self._pv_shutter.get()) but.clicked.connect(self.toggle_shutter) self._button_shutter = but grp.layout().addWidget(but, 0, 0, 1, 2) self._pv_shutter.add_callback(self.update_shutter_label) else: self._picker = bernina_pulse_picker.PickerWidget() grp.layout().addWidget(self._picker, 0, 0, 1, 2) but = QPushButton("clear beam markers") but.clicked.connect(self.remove_beam_markers) grp.layout().addWidget(but, 1, 1) help = QTextBrowser() grp.layout().addWidget(help, 2, 0, 5, 2) help.setHtml( """

Updating Beam Mark

  1. clear beam markers
  2. for each zoom level, CTRL-Click (left mouse button) the center of the beam
""" ) # group regions block = QWidget() block.setAccessibleName(TASK_SETUP_ROI) block.setContentsMargins(0, 0, 0, 0) block.setLayout(QVBoxLayout()) grp = QWidget() block.layout().addWidget(grp) block.layout().addStretch() grp.setLayout(QGridLayout()) tbox.addItem(block, "Regions of Interest") but = QPushButton("add RECT roi") but.clicked.connect(self.roi_add_rect) grp.layout().addWidget(but, 0, 0) but = QPushButton("add LINE roi") but.clicked.connect(self.roi_add_line) grp.layout().addWidget(but, 0, 1) # group regions block = QWidget() block.setAccessibleName(TASK_SETUP_CAMERA) block.setContentsMargins(0, 0, 0, 0) block.setLayout(QVBoxLayout()) grp = QWidget() block.layout().addWidget(grp) block.layout().addStretch() grp.setLayout(QGridLayout()) tbox.addItem(block, "Camera Settings") but = QPushButton("remove transformations") but.clicked.connect(lambda bogus: self.modify_camera_transform("remove_all")) grp.layout().addWidget(but, 0, 0) but = QPushButton("remove last") but.clicked.connect(lambda bogus: self.modify_camera_transform("undo_last")) grp.layout().addWidget(but, 0, 1) but = QPushButton("Turn 90 CCW") but.clicked.connect( lambda bogus, n=camera.Transforms.ROTATE_90: self.modify_camera_transform(n) ) grp.layout().addWidget(but, 1, 0) but = QPushButton("Turn 90 CW") but.clicked.connect( lambda bogus, n=camera.Transforms.ROTATE_270: self.modify_camera_transform( n ) ) grp.layout().addWidget(but) but = QPushButton("Turn 180 CCW") but.clicked.connect( lambda bogus, n=camera.Transforms.ROTATE_180: self.modify_camera_transform( n ) ) grp.layout().addWidget(but) but = QPushButton("Transpose") but.clicked.connect( lambda bogus, n=camera.Transforms.TRANSPOSE: self.modify_camera_transform(n) ) grp.layout().addWidget(but) but = QPushButton("Flip L/R") but.clicked.connect( lambda bogus, n=camera.Transforms.FLIP_LR: self.modify_camera_transform(n) ) grp.layout().addWidget(but) but = QPushButton("Flip U/D") but.clicked.connect( lambda bogus, n=camera.Transforms.FLIP_UD: self.modify_camera_transform(n) ) grp.layout().addWidget(but) row = grp.layout().rowCount() self._label_transforms = QLabel() self._label_transforms.setWordWrap(True) self.modify_camera_transform(None) grp.layout().addWidget(self._label_transforms, row, 0, 1, 2) # group regions block = QWidget() block.setAccessibleName(TASK_SETUP_PPM_CALIBRATION) block.setContentsMargins(0, 0, 0, 0) block.setLayout(QVBoxLayout()) grp = QWidget() block.layout().addWidget(grp) block.layout().addStretch() grp.setLayout(QGridLayout()) tbox.addItem(block, "Pixel/MM Settings") grp.layout().addWidget(QLabel("feature size (µM)"), 0, 0) self._ppm_feature_size_spinbox = QDoubleSpinBox() self._ppm_feature_size_spinbox.setRange(5, 10000) self._ppm_feature_size_spinbox.setValue(50) self._ppm_feature_size_spinbox.setDecimals(0) self._ppm_feature_size_spinbox.setSuffix(" µM") grp.layout().addWidget(self._ppm_feature_size_spinbox, 0, 1) self._ppm_calibration = but = QPushButton("Start calibration") but.setCheckable(True) but.clicked.connect(lambda x: self.update_ppm_fitters()) grp.layout().addWidget(but, 1, 0, 1, 2) help = QTextBrowser() grp.layout().addWidget(help, 2, 0, 5, 2) help.setHtml( """

Pixel/MM Setup

  1. choose: feature size
  2. press: Start calibration
  3. for each zoom level, CTRL-Click (left mouse button) 2 locations which are feature-size apart
  4. un-click: Start calibration
""" ) # grp = PpmToolBox(viewbox=self.vb, camera=sample_camera, signals=self) # grp.setAccessibleName(TASK_SETUP_PPM_CALIBRATION_TBOX) # grp.pixelPerMillimeterCalibrationChanged.connect(self.update_ppm_fitters) # tbox.addItem(grp, "PPM ToolBox") # self._ppm_toolbox = grp tbox.currentChanged.connect(self.switch_task) # final stretch # setup_tab.layout().addStretch() exp_tab.layout().addStretch() # DAQ Methods Tabs self.build_daq_methods_prelocated_tab() self.build_daq_methods_grid_tab() self.build_sample_selection_tab() def build_sample_selection_tab(self): # self._sample_selection = sample_selection.SampleSelection(self) # self._sample_selection.move_to_mount_position = ( # self.move_gonio_to_mount_position # ) # self._tab_sample_selection.setLayout(QVBoxLayout()) # self._tab_sample_selection.layout().addWidget(self._sample_selection) # self._tab_sample_selection.layout().addStretch(2) pass def build_embl_group(self): grp = QGroupBox("EMBL Acquisition") layout = QFormLayout() grp.setLayout(layout) layout.addWidget(QLabel("Prefix")) self._embl_prefix = QLineEdit("img") layout.addWidget(self._embl_prefix) def abort_measurement(self, ev=None): if settings.value(ACTIVATE_PULSE_PICKER): pulsePicker.close() jungfrau_detector.abort() delta_tau.abort() logger.debug("aborting measurement") def trigger_detector(self, **kwargs): if self._pv_shutter is not None: self._pv_shutter.put(0) # self._eiger_button_collect.show() # self._eiger_button_abort.hide() # self._eiger_now_collecting_label.setText( # "Finished sequence id: {}\n" # "Data in: Data10/{}".format( # self._detector_sequence_id, self._eiger_now_collecting_file # ) # ) def modify_camera_transform(self, t): if t == "remove_all": sample_camera.set_transformations([]) elif t == "undo_last": sample_camera._transformations.pop() elif type(t) ==type(camera.Transforms): sample_camera.append_transform(t) try: label = ", ".join([t.name for t in sample_camera._transformations]) except: label = "" self._label_transforms.setText(label) settings.setValue(CAMERA_TRANSFORMATIONS, sample_camera._transformations) def roi_add_line(self): roi = pg.LineSegmentROI( [200, 200], [300, 300], pen="r", scaleSnap=True, translateSnap=True, rotateSnap=True, removable=True, ) # roi.sigRegionChanged.connect(self.track_roi) roi.sigRemoveRequested.connect(self.remove_roi) self.vb.addItem(roi) self._rois.append(roi) def roi_add_rect(self): roi = pg.RectROI( [200, 200], [50, 50], pen="r", scaleSnap=True, translateSnap=True, rotateSnap=True, removable=True, ) roi.sigRegionChanged.connect(self.track_roi) roi.sigRemoveRequested.connect(self.remove_roi) self.vb.addItem(roi) self._rois.append(roi) def remove_roi(self, roi): self.vb.removeItem(roi) self._rois.remove(roi) def prepare_embl_gui(self): self._tab_daq_method_embl.setLayout(QVBoxLayout()) layout = self._tab_daq_method_embl.layout() motors = self.get_gonio_motors() self._embl_module = EmblWidget(self) self._embl_module.configure(motors, sample_camera, qoptic_zoom) layout.addWidget(self._embl_module) def prepare_microscope_page(self): layout = self.microscope_page.layout() container = QWidget() hlay = QHBoxLayout() container.setLayout(hlay) layout.addWidget(container) def update_beam_marker(self, zoom_lvl): w, h = settings.value(BEAM_SIZE) try: bx = self.beamx_fitter(zoom_lvl) by = self.beamy_fitter(zoom_lvl) ok = True except: ok = False logger.warning("beam marker not defined") return logger.debug("updating beam mark to {:.1f}x{:.1f}".format(bx, by)) self.beamCameraCoordinatesChanged.emit(bx, by) def update_beam_marker_fitters(self): if len(self._beam_markers) > 2: logger.debug("defining beam marker") bx = [(n, x[0]) for n, x in self._beam_markers.items()] by = [(n, x[1]) for n, x in self._beam_markers.items()] nbx = np.asarray(bx).T nby = np.asarray(by).T bx_coefs = np.polyfit(nbx[0], nbx[1], 3) by_coefs = np.polyfit(nby[0], nby[1], 3) logger.debug(".... beam marker X coeficients {}".format(bx_coefs)) logger.debug(".... beam marker Y coeficients {}".format(by_coefs)) self.beamx_fitter = np.poly1d(bx_coefs) self.beamy_fitter = np.poly1d(by_coefs) def update_ppm_fitters(self, calib=None): if calib is not None: logger.info("received new calibration for PPM") self._zoom_to_ppm = calib # self._zoom_to_ppm = { # FIXME: eventually automate # 1: 830, # 100: 940, # 200: 1220, # 400: 1950, # 600: 3460, # 800: 5400, # 900: 7150, # 1000: 9200, # } num_points = len(self._zoom_to_ppm) if num_points < 2: return elif num_points < 4: order = 2 elif num_points < 6: order = 3 else: order = 4 logger.debug( "polynomial fitting using {} data points of order {}".format( num_points, order ) ) bx = [(z, ppm) for z, ppm in self._zoom_to_ppm.items()] nbx = np.asarray(bx).T bx_coefs = np.polyfit(nbx[0], nbx[1], order) logger.debug(".... ppm fitting coeficients {}".format(bx_coefs)) self.ppm_fitter = np.poly1d(bx_coefs) def getFastX(self): return self.tweakers["fast_x"].motor.get_position() def getFastY(self): return self.tweakers["fast_y"].motor.get_position() def zoom_changed_cb(self, value): self.zoomChanged.emit(value) try: ppm = self.ppm_fitter(value) except AttributeError as e: logger.warning(e) else: logger.debug("zoom callback zoomChanged.emit({}), pixelsPerMillimeter.emit({})".format(value, ppm)) self.pixelsPerMillimeter.emit(ppm) self.update_beam_marker(value) def getZoom(self): return qoptic_zoom.get_position() def getPpm(self): ppm_fitter = None try: ppm_fitter = self.ppm_fitter(self.getZoom()) except Exception as e: logger.error("garbage in getPpm()") traceback.print_exc() return ppm_fitter def append_to_beam_markers(self, x, y, zoom): self._beam_markers[zoom] = (x, y) logger.info("beam markers {}".format(self._beam_markers)) settings.setValue(BEAM_MARKER_POSITIONS, self._beam_markers) self.update_beam_marker_fitters() def remove_beam_markers(self): self._beam_markers = {} self.beamx_fitter = None self.beamy_fitter = None def track_roi(self, roi): x, y = roi.pos() w, h = roi.size() # area = roi.getArrayRegion(self._im, self.img) # sum = np.sum(area) # logger.info('{} => sum {}'.format((x,y), sum)) bx, by = x + w / 2., y + h / 2. logger.info("beam pos {}".format((bx, by))) logger.info("marker pos = {} ; size = {}".format((x, y), (w, h))) def toggle_mouse_tracking(self): if self._mouse_tracking: self.disengage_mouse_tracking() else: self.engage_mouse_tracking() def engage_mouse_tracking(self): self.glw.scene().sigMouseMoved.connect(self.mouse_move_event) self.glw.scene().sigMouseMoved.emit() self._mouse_tracking = True def disengage_mouse_tracking(self): self.glw.scene().sigMouseMoved.disconnect(self.mouse_move_event) self._mouse_tracking = False self._lb_coords.setText("") def mouse_move_event(self, pos): self._mouse_pos = pos task = self.active_task() xy = self.img.mapFromScene(pos) z = qoptic_zoom.get_position() # if self._ppm_toolbox._force_ppm > 0: # ppm = self._ppm_toolbox._force_ppm # else: try: ppm = self.ppm_fitter(z) except: ppm = 0 # do not move if we don't know ppm x, y = xy.x(), xy.y() try: bx, by = self.get_beam_mark_on_camera_xy() except: bx, by = 500, 500 dx = (x - bx) / ppm dy = -(y - by) / ppm fx = self.tweakers["fast_x"].motor.get_position() fy = self.tweakers["fast_y"].motor.get_position() fx += dx fy += dy self._lb_coords.setText( "\u23FA{:>}:{:>6.0f} {:<.0f}" "\u23FA{:>}:{:>6.0f} {:<.0f}" "\u23FA{:>}:{:<.0f}" "\u23FA{:>}:{:>6.1f} {:<.1f} \u00B5" "\u23FA{:>}:{:>7.3f} {:<.3f} mm".format( "Beam at", bx, by, "Pixel coord ", x, y, "PPM", ppm, "Distance to beam", 1000 * dx, 1000 * dy, "Stage", fx, fy, ) ) def mouse_click_event(self, event): fx_motor, fy_motor, bx_motor, bz_motor, omega_motor = self.get_gonio_tweakers() logger.debug("{}".format(event)) logger.debug(" pos {}".format(event.pos())) logger.debug("screen pos {}".format(event.screenPos())) logger.debug("scene pos {}".format(event.scenePos())) task = self.active_task() if event.button() == Qt.RightButton: print_transform(self.vb.childTransform()) event.ignore() return pos = event.scenePos() zoom_level = qoptic_zoom.get_position() logger.debug(" zoom {}".format(zoom_level)) try: ppm = self.ppm_fitter(zoom_level) except: ppm = 0 # do not move if we don't know ppm dblclick = event.double() shft = Qt.ShiftModifier & event.modifiers() ctrl = Qt.ControlModifier & event.modifiers() alt = Qt.AltModifier & event.modifiers() xy = self.img.mapFromScene(pos) x, y = xy.x(), xy.y() bx, by = self.get_beam_mark_on_camera_xy() a = np.asarray([x, y]) b = np.asarray([bx, by]) pixel_dist = np.linalg.norm(a - b) logger.debug("distance to beam: {:.1f} pixels".format(pixel_dist)) omega = omega_motor.motor.get_position() logger.debug("omega at {:.03f} degrees".format(omega)) fx = fx_motor.motor.get_position() fy = fy_motor.motor.get_position() dx = (x - bx) / ppm dy = -(y - by) / ppm fx += dx fy += dy logger.debug("click at : {:.3f} {:.3f}".format(x, y)) logger.debug("beam at : {:.3f} {:.3f}".format(bx, by)) logger.debug("gonio at : {:.3f} {:.3f}".format(fx, fy)) # publisher.submit( # {"event": "gonio", "data": self.get_gonio_positions(as_json=True)} # ) logger.debug("TASK = {}".format(task)) # Click without modifers moves clicked position to beam mark if task not in (TASK_SETUP_PPM_CALIBRATION, TASK_SETUP_BEAM_CENTER): if not (ctrl or shft or alt): if ppm == 0: logger.warning("PPM is not defined - do the PPM calibration") if task == TASK_HELICAL: dx = (x - bx) / ppm dy = -(y - by) / ppm logger.info("moving base_X, fast_Y {:.3f}, {:.3f}".format(dx, dy)) bx_motor.move_relative(dx) fy_motor.move_relative(dy) else: dx = np.cos(omega * np.pi / 180) * (x - bx) / ppm dy = -(y - by) / ppm logger.info("moving fast x/y to {:.3f}, {:.3f}".format(dx, dy)) fx_motor.move_relative(dx) fy_motor.move_relative(dy) self.fast_dx_position.emit(dx) self.fast_dy_position.emit(dy) return # CTRL-Click always just moves Z if ctrl and not (shft or alt): mm = np.sign(y - by) * pixel_dist / 10000. logger.debug("moving Z stage by {:.3f} mm".format(mm)) bz_motor.move_relative(mm) return if task in (TASK_SETUP_PPM_CALIBRATION, TASK_SETUP_BEAM_CENTER): if ctrl and self._ppm_calibration.isChecked(): if self._ppm_click is not None: feature_dist = self._ppm_feature_size_spinbox.value() / 1000. b = self._ppm_click pixel_dist = np.linalg.norm(a - b) ppm = pixel_dist / feature_dist logger.debug("calib save a {}".format(a)) logger.debug("calib save b {}".format(b)) logger.debug(" feature {}".format(feature_dist)) logger.debug("a -> b pixels {}".format(pixel_dist)) logger.debug(" ppm({}) {}".format(zoom_level, pixel_dist)) self._zoom_to_ppm[zoom_level] = ppm settings.setValue(CAMERA_ZOOM_TO_PPM, self._zoom_to_ppm) self._ppm_click = None logger.debug("ppm data so far {}".format(self._zoom_to_ppm)) else: logger.debug("calib save b {}".format(a)) self._ppm_click = a return elif task == TASK_SETUP_BEAM_CENTER: if ctrl and not (shft or alt): logger.info( "beam mark add: zoom {} => {:.1f} {:.1f}".format( zoom_level, x, y ) ) self.append_to_beam_markers(x, y, zoom_level) else: logger.warning("click combination not available") elif task == TASK_POINT_SHOOT: pass elif task == TASK_GRID: if shft and not (ctrl or alt): self.addGridRequest.emit(fx, fy) elif task == TASK_PRELOCATED: if ctrl and shft: omegacos = option(DELTATAU_OMEGACOS) if omegacos: fx = fx / np.cos(omega * np.pi / 180) logger.info( "selectin fiducial (omegacos): {:.3f}, {:.3f}".format(fx, fy) ) else: logger.info("selectin fiducial: {:.3f}, {:.3f}".format(fx, fy)) self.fiducialPositionSelected.emit(x, y, fx, fy) elif shft and alt: self.appendPrelocatedPosition.emit(x, y, fx, fy) elif task == TASK_HELICAL: logger.info("no action associated ") elif task == TASK_EMBL: pass else: pass def get_beam_mark_on_camera_xy(self): z = qoptic_zoom.get_position() try: bx = self.beamx_fitter(z) by = self.beamy_fitter(z) except: bx, by = 500, 500 return (bx, by) def safe_backlight_move(self, pos): # any move of backlight requires post sample tube out try: self.assert_post_tube_position(pos="out") except: self.move_post_tube("out") backlight.move(pos) def escape_run_steps(self, steps, title): with pg.ProgressDialog(title, 0, len(steps)) as dlg: for step in steps: step() dlg += 1 if dlg.wasCanceled(): raise TransitionAborted("ABORTED" + title) def escape_goToSampleExchange(self): self._escape_current_state = "busy" steps = [] if option(CRYOJET_MOTION_ENABLED): steps.append(lambda: self.move_cryojet_nozzle("out")) steps.extend( [ lambda: self.move_post_tube("out"), lambda: backlight.move("out", wait=True), lambda: self.move_collimator("out"), ] ) self.escape_run_steps(steps, "Transitioning to Sample Exchange") self._escape_current_state = "ManualSampleExchange" def escape_goToSampleAlignment(self): self._escape_current_state = "busy" steps = [ # lambda: sample_selection.tell.set_current(30.0), lambda: self.move_collimator("ready") ] if option(CRYOJET_MOTION_ENABLED): steps.extend([lambda: self.move_cryojet_nozzle("in")]) steps.extend([lambda: self.move_post_tube("out"), lambda: backlight.move("in")]) self.escape_run_steps(steps, "Transitioning to Sample Alignment") self._escape_current_state = "SampleAlignment" def escape_goToDataCollection(self): self._escape_current_state = "busy" steps = [ # lambda: sample_selection.tell.set_current(30.0), lambda: backlight.move("out", assert_positions=True), lambda: self.move_post_tube("in"), lambda: self.move_collimator("in"), ] if option(CRYOJET_MOTION_ENABLED): steps.extend([lambda: self.move_cryojet_nozzle("in")]) self.escape_run_steps(steps, "Transitioning to Data Collection") self._escape_current_state = "DataCollection" def move_gonio_to_position(self, fx, fy, bx, bz, omega): self.tweakers["fast_x"].motor.move(fx, wait=False, ignore_limits=True) self.tweakers["fast_y"].motor.move(fy, wait=False, ignore_limits=True) self.tweakers["base_x"].motor.move(bx, wait=False, ignore_limits=True) self.tweakers["base_z"].motor.move(bz, wait=False, ignore_limits=True) self.tweakers["omega"].motor.move(omega, wait=False, ignore_limits=True) def get_gonio_motors(self, as_json=False): if as_json: return { "fast_x": self.tweakers["fast_x"].motor, "fast_y": self.tweakers["fast_y"].motor, "base_x": self.tweakers["base_x"].motor, "base_z": self.tweakers["base_z"].motor, "omega": self.tweakers["omega"].motor, } else: return ( self.tweakers["fast_x"].motor, self.tweakers["fast_y"].motor, self.tweakers["base_x"].motor, self.tweakers["base_z"].motor, self.tweakers["omega"].motor, ) def get_gonio_tweakers(self): return ( self.tweakers["fast_x"], self.tweakers["fast_y"], self.tweakers["base_x"], self.tweakers["base_z"], self.tweakers["omega"], ) def get_gonio_positions(self, as_json: bool = False): fx, fy, cx, cz, omega = ( self.tweakers["fast_x"].motor, self.tweakers["fast_y"].motor, self.tweakers["base_x"].motor, self.tweakers["base_z"].motor, self.tweakers["omega"].motor, ) a, b, c, d, e = ( fx.get_position(), fy.get_position(), cx.get_position(), cz.get_position(), omega.get_position(), ) if as_json: return {"fx": a, "fy": b, "bx": c, "bz": d, "omega": e} else: return (a, b, c, d, e) def escape_goToTellMountPosition(self): self.move_gonio_to_mount_position() self.lock_goniometer() def move_gonio_to_mount_position(self, offset: float = 0.0): fx, fy, cx, cz, omega = self.get_gonio_motors() bmark = "bookmark_0" try: t_fx = float(settings.value(bmark + "/mount_fx")) t_fy = -offset + float(settings.value(bmark + "/mount_fy")) t_cx = float(settings.value(bmark + "/mount_cx")) t_cz = float(settings.value(bmark + "/mount_cz")) t_omega = float(settings.value(bmark + "/mount_omega")) except: raise IncompleteConfiguration( "TELL sample changer mount position is not configured!!!" ) fx.move(t_fx, wait=True, ignore_limits=True) fy.move(t_fy, wait=True, ignore_limits=True) cx.move(t_cx, wait=True, ignore_limits=True) cz.move(t_cz, wait=True, ignore_limits=True) omega.move(t_omega, wait=True, ignore_limits=True) app_utils.assert_motor_positions( [ (fx, t_fx, 0.01), (fy, t_fy, 0.01), (cx, t_cx, 0.01), (cz, t_cz, 0.01), (omega, t_omega, 0.01), ] ) self.escape_goToSampleExchange() def lock_goniometer(self): # tell.set_in_mount_position(True) res = QMessageBox.question( self, "", "Mount a sample from console and click ok once the sample is mounted.", QMessageBox.Ok, QMessageBox.Ok, ) res = QMessageBox.question( self, "", "Is the sample is mounted?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No, ) if res in (QMessageBox.Yes, QMessageBox.No): # tell.set_in_mount_position(False) pass @pyqtSlot(int) def saveBookmark(self, key: int): """save a bookmark for the corresponding key is the Qt.Key_* code """ fx, fy, cx, cz, omega = ( self.tweakers["fast_x"].motor, self.tweakers["fast_y"].motor, self.tweakers["base_x"].motor, self.tweakers["base_z"].motor, self.tweakers["omega"].motor, ) bmark = "bookmark_{}".format(key) if key == 0: ans = QMessageBox.question( self, "Override TELL mount position", "This will overwrite the positions used for TELL MOUNTING!!!\n\n\tContinue ?", ) if ans != QMessageBox.Yes: return logger.info( "saving bookmark {}: {}, {}, {}, {}, {}".format( bmark, fx.get_position(), fy.get_position(), cx.get_position(), cz.get_position(), omega.get_position(), ) ) settings.setValue(bmark + "/mount_fx", fx.get_position()) settings.setValue(bmark + "/mount_fy", fy.get_position()) settings.setValue(bmark + "/mount_cx", cx.get_position()) settings.setValue(bmark + "/mount_cz", cz.get_position()) settings.setValue(bmark + "/mount_omega", omega.get_position()) def gotoBookmark(self, key: int): """save a bookmark for the corresponding key""" fx, fy, cx, cz, omega = ( self.tweakers["fast_x"].motor, self.tweakers["fast_y"].motor, self.tweakers["base_x"].motor, self.tweakers["base_z"].motor, self.tweakers["omega"].motor, ) bmark = "bookmark_{}".format(key) try: t_fx = float(settings.value(bmark + "/mount_fx")) t_fy = float(settings.value(bmark + "/mount_fy")) t_cx = float(settings.value(bmark + "/mount_cx")) t_cz = float(settings.value(bmark + "/mount_cz")) t_omega = float(settings.value(bmark + "/mount_omega")) except: return fx.move(t_fx, wait=True, ignore_limits=True) fy.move(t_fy, wait=True, ignore_limits=True) cx.move(t_cx, wait=True, ignore_limits=True) cz.move(t_cz, wait=True, ignore_limits=True) omega.move(t_omega, wait=True, ignore_limits=True) def move_collimator(self, pos): cx = self.tweakers["colli_x"] cy = self.tweakers["colli_y"] x_pos = settings.value("collimator/x_in", 1e10, type=float) y_pos = settings.value("collimator/y_in", 1e10, type=float) dx = settings.value("collimator/dx", 1e10, type=float) dy = settings.value("collimator/dy", 1e10, type=float) if 1e9 < x_pos + y_pos + dx + dy: raise IncompleteConfiguration("COLLIMATOR configuration is incomplete!") logger.info( "moving collimator {} to X,Y = {:.3f}, {:.3f}".format(pos, x_pos, y_pos) ) if pos == "out": cy.move_motor_to_position(y_pos + dy, assert_position=True) cx.move_motor_to_position(x_pos + dx, assert_position=True) elif pos == "in": cx.move_motor_to_position(x_pos, assert_position=True) cy.move_motor_to_position(y_pos, assert_position=True) elif pos == "ready": cx.move_motor_to_position(x_pos, assert_position=True) cy.move_motor_to_position(y_pos + dy, assert_position=True) else: raise UnknownLabeledPosition("Collimator position *{}* is not known!!") def move_cryojet_nozzle(self, pos): cx = self.tweakers["cryo"] if "in" == pos.lower(): key = CRYOJET_NOZZLE_IN elif "out" == pos.lower(): key = CRYOJET_NOZZLE_OUT to_pos = settings.value(key, 1e10, type=float) if to_pos > 1e9: raise IncompleteConfiguration( f"CRYOJET configuration is incomplete! Missing {key}" ) cx.move_motor_to_position(to_pos, assert_position=True) def setup_sliders(self): cont = self._rightmost self._tweak_container = QWidget() self._tweak_container.setLayout(QVBoxLayout()) cont.layout().addWidget(self._tweak_container) layout = self._tweak_container.layout() layout.setSpacing(0) layout.setContentsMargins(0, 0, 0, 0) if self._at_x06sa: zoom_base = "ESBMX-SAMCAM" # fetura IOC by Jose Gabadinho elif self._at_lab_eh060: zoom_base = "/dev/ttyUSB0" # direct connection using fetura.py package elif self._at_cristalina: zoom_base = ( "rest://pc12818.psi.ch:9999" ) # direct connection using fetura.py package if self._has_camera: samcam = sample_camera else: samcam = None self.zoombox = zoom.Zoom() self.zoombox.configure(camera=samcam) self.zoombox.zoomChanged.connect(self.zoom_changed_cb) self.zoombox.moveBacklight.connect(self.safe_backlight_move) layout.addWidget(self.zoombox) toolbox = QToolBox() layout.addWidget(toolbox) qutilities.add_item_to_toolbox( toolbox, "Fast Stage", widget_list=[ # self.get_tweaker('SAR-EXPMX:MOT_BLGT', alias='backlight', label='backlight'), self.get_tweaker("SAR-EXPMX:MOT_FY", alias="fast_y", label="fast Y"), self.get_tweaker("SAR-EXPMX:MOT_FX", alias="fast_x", label="fast X"), self.get_tweaker( "SAR-EXPMX:MOT_ROT_Y", alias="omega", label="omega", tweak_min=0.001, tweak_max=180.0, ), self.get_tweaker("SAR-EXPMX:MOT_CX", alias="base_x", label="base X"), self.get_tweaker("SAR-EXPMX:MOT_CZ", alias="base_z", label="base Z"), ], ) # qutilities.add_item_to_toolbox( # toolbox, # "Wedge Mover", # widget_list=[ # self.get_tweaker( # "SAR-EXPMX:MOT_WEDGE1", alias="wedge_1", label="wedge_1" # ), # self.get_tweaker( # "SAR-EXPMX:MOT_WEDGE2", alias="wedge_2", label="wedge_2" # ), # self.get_tweaker( # "SAR-EXPMX:MOT_WEDGE3", alias="wedge_3", label="wedge_3" # ), # self.get_tweaker( # "SAR-EXPMX:MOT_WEDGE4", alias="wedge_4", label="wedge_4" # ), # self.get_tweaker( # "SAR-EXPMX:MOT_WEDGEX", alias="wedge_x", label="wedge_x" # ), # self.get_tweaker( # "SAR-EXPMX:MOT_WEDGEY", alias="wedge_y", label="wedge_y" # ), # self.get_tweaker( # "SAR-EXPMX:MOT_WEDGEA", alias="wedge_a", label="wedge_a" # ), # self.get_tweaker( # "SAR-EXPMX:MOT_WEDGEB", alias="wedge_b", label="wedge_b" # ), # ], # ) # # self.build_slits_group(toolbox) self.build_collimator_group(toolbox) self.build_posttube_group(toolbox) self.build_xeye_group(toolbox) self.build_cryo_group(toolbox) # monitor all axis for an axis fault for key, tweaker in self.tweakers.items(): tweaker.event_axis_fault.connect(self.axis_fault) self.tweakers["fast_x"].event_readback.connect( lambda alias, value, kw: self.fast_x_position.emit(value) ) self.tweakers["fast_y"].event_readback.connect( lambda alias, value, kw: self.fast_y_position.emit(value) ) # layout.addStretch() def axis_fault(self, pvname, kw): """ swissmx - {'pvname': 'SAR-EXPMX:MOT_FY.STAT', 'value': 0, 'char_value': 'NO_ALARM', 'status': 0, 'ftype': 17, 'chid': 38980392, 'host': 'SAR-CPPM-EXPMX1.psi.ch:5064', 'count': 1, 'access': 'read-only', 'write_access': False, 'read_access': True, 'severity': 0, 'timestamp': 1537867290.914189, 'precision': None, 'units': None, 'enum_strs': ('NO_ALARM', 'READ', 'WRITE', 'HIHI', 'HIGH', 'LOLO', 'LOW', 'STATE', 'COS', 'COMM', 'TIMEOUT', 'HWLIMIT', 'CALC', 'SCAN', 'LINK', 'SOFT'), 'upper_disp_limit': None, 'lower_disp_limit': None, 'upper_alarm_limit': None, 'lower_alarm_limit': None, 'lower_warning_limit': None, 'upper_warning_limit': None, 'upper_ctrl_limit': None, 'lower_ctrl_limit': None, 'nelm': 1, 'type': 'time_enum', 'typefull': 'time_enum', 'motor_field': 'STAT', 'source_field': 'STAT', 'cb_info': (1, ), 'alias': 'fast Y'} :param pvname: :param kw: :return: """ logger.debug(f"AXIS FAULT: {kw}") if kw["severity"]: msg = f"axis {pvname} has a fault" logger.critical(msg) logger.critical(f"{kw}") else: msg = "" self._message_critical_fault.setText(msg) def build_cryo_group(self, toolbox): qutilities.add_item_to_toolbox( toolbox, "Cryojet", widget_list=[ self.get_tweaker("SAR-EXPMX:MOT_CRYO", alias="cryo", label="cryo X") ], ) def build_collimator_group(self, toolbox): qutilities.add_item_to_toolbox( toolbox, "Collimator", widget_list=[ self.get_tweaker( "SARES30-ESBMX1", alias="colli_x", label="colli X", mtype="smaract_motor", ), self.get_tweaker( "SARES30-ESBMX2", alias="colli_y", label="colli Y", mtype="smaract_motor", ), ], ) def build_slits_group(self, toolbox): qutilities.add_item_to_toolbox( toolbox, "Slits", widget_list=[ self.get_tweaker( "SARES30-ESBMX10", alias="slit_right", label="left", mtype="smaract_motor", ), self.get_tweaker( "SARES30-ESBMX11", alias="slit_left", label="right", mtype="smaract_motor", ), self.get_tweaker( "SARES30-ESBMX12", alias="slit_bottom", label="bottom", mtype="smaract_motor", ), self.get_tweaker( "SARES30-ESBMX13", alias="slit_top", label="top", mtype="smaract_motor", ), ], ) def build_xeye_group(self, toolbox): qutilities.add_item_to_toolbox( toolbox, "X-Ray Eye", widget_list=[ self.get_tweaker( "SARES30-ESBMX14", alias="xeye_x", label="X", mtype="smaract_motor" ), self.get_tweaker( "SARES30-ESBMX15", alias="xeye_y", label="Y", mtype="smaract_motor" ), ], ) def build_posttube_group(self, toolbox): widgets = [ self.get_tweaker( "SARES30-ESBMX4", alias="tube_usx", label="upstream X", mtype="smaract_motor", ), self.get_tweaker( "SARES30-ESBMX6", alias="tube_usy", label="upstream Y", mtype="smaract_motor", ), self.get_tweaker( "SARES30-ESBMX5", alias="tube_dsx", label="downstream X", mtype="smaract_motor", ), self.get_tweaker( "SARES30-ESBMX7", alias="tube_dsy", label="downstream Y", mtype="smaract_motor", ), self.get_tweaker( "SARES30-ESBMX8", alias="tube_z", label="tube Z", mtype="smaract_motor" ), ] c = QWidget() c.setLayout(QGridLayout()) but = QPushButton("post tube out") but.clicked.connect(lambda m, dir="out": self.move_post_tube(dir)) c.layout().addWidget(but, 0, 0) but = QPushButton("post tube in") but.clicked.connect(lambda m, dir="in": self.move_post_tube(dir)) c.layout().addWidget(but, 0, 1) widgets.append(c) label = "Post Sample Tube" block = QWidget() block.setAccessibleName(label) block.setContentsMargins(0, 0, 0, 0) block.setLayout(QVBoxLayout()) for w in widgets: block.layout().addWidget(w) block.layout().addWidget(self.build_post_tube_tandem_tweaker()) block.layout().addStretch() toolbox.addItem(block, label) def build_post_tube_tandem_tweaker(self): block = QWidget() block.setAccessibleName("post tube tamden") block.setContentsMargins(0, 0, 0, 0) block.setLayout(QVBoxLayout()) c = QWidget() glay = QGridLayout() c.setLayout(glay) b = QWidget() blay = QHBoxLayout() b.setLayout(blay) blay.addWidget(QLabel("Tweak (mm)")) self._post_tandem_tweak_val = QLineEdit("1.0") blay.addWidget(self._post_tandem_tweak_val) glay.addWidget(b, 0, 0, 1, 2) icon = qtawesome.icon("material.arrow_back") but = QPushButton(icon, "") but.clicked.connect(lambda m, dir="x-pos": self.move_post_tube(dir)) glay.addWidget(but, 1, 0) # c.setLayout(glay) icon = qtawesome.icon("material.arrow_forward") but = QPushButton(icon, "") but.clicked.connect(lambda m, dir="x-neg": self.move_post_tube(dir)) glay.addWidget(but, 1, 1) # c.setLayout(glay) icon = qtawesome.icon("material.arrow_downward") but = QPushButton(icon, "") but.clicked.connect(lambda m, dir="down": self.move_post_tube(dir)) glay.addWidget(but, 2, 0) # c.setLayout(glay) icon = qtawesome.icon("material.arrow_upward") but = QPushButton(icon, "") but.clicked.connect(lambda m, dir="up": self.move_post_tube(dir)) glay.addWidget(but, 2, 1) block.layout().addWidget(c) return block def assert_post_tube_position(self, pos): x_up = settings.value("post_sample_tube/x_up", 1e10, type=float) y_up = settings.value("post_sample_tube/y_up", 1e10, type=float) x_down = settings.value("post_sample_tube/x_down", 1e10, type=float) y_down = settings.value("post_sample_tube/y_down", 1e10, type=float) dx = settings.value("post_sample_tube/dx", 1e10, type=float) dy = settings.value("post_sample_tube/dy", 1e10, type=float) tz_in = settings.value("post_sample_tube/z_in", 1e10, type=float) tz_out = settings.value("post_sample_tube/z_out", 1e10, type=float) if (x_up + y_up + x_down + y_down + dx + dy + tz_in + tz_out) > 1e9: raise Exception("miscofingured positions for post-sample tube") usy = self.tweakers["tube_usy"] dsy = self.tweakers["tube_dsy"] usx = self.tweakers["tube_usx"] dsx = self.tweakers["tube_dsx"] tbz = self.tweakers["tube_z"] if pos == "in": yu = y_up xu = x_up yd = y_down xd = x_down z = tz_in elif pos == "out": yu = y_up + dy xu = x_up + dx yd = y_down + dy xd = x_down + dx z = tz_out app_utils.assert_tweaker_positions( [ (usy, yu, 0.1), (dsy, yd, 0.1), (usx, xu, 0.1), (dsx, xd, 0.1), (tbz, z, 0.1), ], timeout=2.0, ) def move_post_tube(self, dir): try: tandem_twv = float(self._post_tandem_tweak_val.text()) except: tandem_twv = 1.0 x_up = settings.value("post_sample_tube/x_up") y_up = settings.value("post_sample_tube/y_up") x_down = settings.value("post_sample_tube/x_down") y_down = settings.value("post_sample_tube/y_down") dx = settings.value("post_sample_tube/dx") dy = settings.value("post_sample_tube/dy") tz_in = settings.value("post_sample_tube/z_in") tz_out = settings.value("post_sample_tube/z_out") if x_up is None: msg = "SwissMX *POST-SAMPLE-TUBE* configuration is incomplete!!!" logger.warning(msg) QMessageBox.warning(self, "post tube not configured", msg) return x_up = float(x_up) y_up = float(y_up) x_down = float(x_down) y_down = float(y_down) dx = float(dx) dy = float(dy) tz_in = float(tz_in) tz_out = float(tz_out) usy = self.tweakers["tube_usy"] dsy = self.tweakers["tube_dsy"] usx = self.tweakers["tube_usx"] dsx = self.tweakers["tube_dsx"] tube_z = self.tweakers["tube_z"] if dir == "in": logger.info("move post sample tube in") usy.move_motor_to_position(y_up) dsy.move_motor_to_position(y_down) usx.move_motor_to_position(x_up) dsx.move_motor_to_position(x_down) try: app_utils.assert_tweaker_positions( [ (usy, y_up, 0.1), (dsy, y_down, 0.1), (usx, x_up, 0.1), (dsx, x_down, 0.1), ], timeout=10.0, ) except app_utils.PositionsNotReached as e: logger.warning("failed to move post sample tube {}".format(dir)) logger.warning(e) QMessageBox.warning( self, "failed to move post sample tube XY {in}", "failed to move post sample tube XY {in}", ) raise tube_z.move_motor_to_position(tz_in, wait=True) try: app_utils.assert_tweaker_positions([(tube_z, tz_in, 0.1)]) except app_utils.PositionsNotReached as e: logger.warning("failed to move post sample tube Z {in}") logger.warning(e) QMessageBox.warning( self, "failed to move post sample tube Z {in}", "failed to move post sample tube Z {in}", ) raise elif dir == "out": logger.info("move post sample tube out") tube_z.move_motor_to_position(tz_out, wait=True) try: app_utils.assert_tweaker_positions([(tube_z, tz_out, 0.1)]) except app_utils.PositionsNotReached as e: logger.warning("failed to move post sample tube {out}") logger.warning(e) QMessageBox.warning( self, "failed to move post sample tube Z {out}", "failed to move post sample tube Z {out}", ) raise usy.move_motor_to_position(y_up + dy) dsy.move_motor_to_position(y_down + dy) usx.move_motor_to_position(x_up + dx) dsx.move_motor_to_position(x_down + dx) try: app_utils.assert_tweaker_positions( [ (usy, y_up + dy, 0.1), (dsy, y_down + dy, 0.1), (usx, x_up + dx, 0.1), (dsx, x_down + dx, 0.1), ], timeout=10.0, ) except app_utils.PositionsNotReached as e: logger.warning("failed to move post sample tube {}".format(dir)) logger.warning(e) QMessageBox.warning( self, "failed to move post sample tube XY {out}", "failed to move post sample tube XY {out}", ) raise elif dir == "x-pos": logger.info( "tamdem move post sample tube X-pos by {} mm".format(tandem_twv) ) usx.move_relative(tandem_twv) dsx.move_relative(tandem_twv) elif dir == "x-neg": logger.info("tamdem move post sample tube X-neg {} mm".format(tandem_twv)) usx.move_relative(-tandem_twv) dsx.move_relative(-tandem_twv) elif dir == "up": logger.info("tamdem move post sample tube UP {} mm".format(tandem_twv)) usy.move_relative(tandem_twv) dsy.move_relative(tandem_twv) elif dir == "down": logger.info("tamdem move post sample tube DOWN {} mm".format(tandem_twv)) usy.move_relative(-tandem_twv) dsy.move_relative(-tandem_twv) def get_tweaker( self, pv, alias=None, label=None, mtype="epics_motor", layout=None, **kwargs ): if mtype == "epics_motor": m = MotorTweak() else: m = SmaractMotorTweak() m.connect_motor(pv, label, **kwargs) self.tweakers[alias] = m return m def add_tweaker(self, pv, alias=None, label=None, mtype="epics_motor", layout=None): if layout is None: layout = self._tweak_container.layout() if mtype == "epics_motor": m = MotorTweak() else: m = SmaractMotorTweak() layout.addWidget(m) m.connect_motor(pv, label) self.tweakers[alias] = m def done_sliding(self): print("done sliding at {}".format(self.slider_fast_x.value())) @pyqtSlot() def saveSampleCameraScreenshot(self): outf = self.get_screenshot_filename() logger.info("saving original clean screenshot: {}".format(outf)) try: sample_camera.saveimage(outf) except Exception as e: logger.warning(e) QMessageBox.warning( self, "Screenshot: failed to save image", "Failed to save screenshot!" ) @pyqtSlot() def saveSampleCameraScreenshotView(self): outf = self.get_screenshot_filename() logger.info("saving view screenshot: {}".format(outf)) exporter = pg.exporters.ImageExporter(self.vb) # set export parameters if needed exporter.parameters()[ "width" ] = 2000 # (note this also affects height parameter) # save to file try: exporter.export(outf) except Exception as e: logger.warning(e) QMessageBox.warning( self, "Screenshot: failed to save viewer image", "Failed to save screenshot of viewer!", ) def get_screenshot_filename(self): global folders logger.info("taking screenhot") prefix = folders.prefix folder = folders.res_folder base = time.strftime("{}_%Y%m%d_%H%M%S.png".format(prefix)) if not os._exists(folder): try: os.makedirs(folder, 0o750, exist_ok=True) except: msg = "Failed to create folder: {}".format(folder) logger.warning(msg) QMessageBox.warning( self, "Screenshot: failed to create folder", "Failed to create output folder for screenshot!\n\n\tScreenshot not taken!", ) raise outf = os.path.join(folder, base) return outf def daq_grid_add_grid(self, gx=None, gy=None): grid_index = len(self._grids) if gx in (False, None): gx, gy = self.getFastX(), self.getFastY() xstep = self._sb_grid_x_step.value() ystep = self._sb_grid_y_step.value() xoffset = self._sb_grid_x_offset.value() yoffset = self._sb_grid_y_offset.value() grid = Grid( x_step=xstep, y_step=ystep, x_offset=xoffset, y_offset=yoffset, gonio_xy=(gx, gy), grid_index=grid_index, parent=self, ) self._grids.append(grid) grid.calculate_gonio_xy() grid.sigRemoveRequested.connect(lambda g=grid: self.remove_grid(g)) self.vb.addItem(grid) def daq_grid_remove_all(self): for grid in self._grids: grid.sigRemoveRequested.emit(grid) # ugly hack self._grids = [] def grids_pause_stage_tracking(self): for grid in self._grids: grid.disable_stage_tracking() def grids_start_stage_tracking(self): for grid in self._grids: grid.enable_stage_tracking() def remove_grid(self, grid): self.vb.removeItem(grid) self._grids.remove(grid) def daq_grid_update(self, grid_index): try: grid = self._grids[grid_index] except: print("grid index not yet there") return points = grid.get_grid_targets() num_points = len(points) etime = float(settings.value("exposureTime")) doc = f"grid_{grid_index} = [" for n, pos in enumerate(points): x, y = pos doc += "[{:8.3f}, {:8.3f}],\n".format(x, y) doc += "]" self._grid_inspect_area.setPlainText(doc) m = "Number of points: {}\nEstimated Time: {:.1f} minutes".format( num_points, num_points * etime / 60. ) self._label_grid_parameters.setText(m) def daq_embl_collect_points(self): coords = self._embl_module.coords points = [[x, y] for x, y, bx, bz, o in coords] points = np.array(points) method = "trajectory" xp = (1000 * points).astype(int) # microns then round int params = (xp[:, 0].tolist(), xp[:, 1].tolist()) self.daq_collect_points( points, visualizer_method=method, visualizer_params=params ) def daq_prelocated_collect_points(self): points = [] data = self.prelocationModule.get_collection_targets() for n, cc in enumerate(data): is_fiducial, gx, gy, cx, cy, ox, oy = cc points.append([gx, gy]) points = np.array(points) method = "trajectory" xp = (1000 * points).astype(int) # microns then round int params = (xp[:, 0].tolist(), xp[:, 1].tolist()) self.daq_collect_points( points, visualizer_method=method, visualizer_params=params ) def daq_grid_collect_grid(self): grid = self._grids[0] # FIXME one grid at a time only points = np.array(grid.get_grid_targets()) method = "grid" params = grid._grid_dimensions # params = ([grid._grid_dimensions[0]], [grid._grid_dimensions[1]]) # FIXME something wrong here< self.daq_collect_points( points, visualizer_method=method, visualizer_params=params ) def daq_grid_findxtals(self): feature_size = self._sb_findxtals_feature_size.value() image = sample_camera.get_image() findObj(-image, objSize=feature_size, viz=1) def check_jungfrau_save(self) -> bool: if jungfrau_detector.is_running_detector(): saveRaw = jungfrau_detector.is_saving_data() if not saveRaw: box = QMessageBox() box.setText("Jungfrau save data disabled!") box.setInformativeText("Jungfrau save data is disabled!") box.setIcon(QMessageBox.Warning) box.setDetailedText( "Choose to abort, enable and continue, or continue without saving raw data" ) btContinue = box.addButton("Continue", QMessageBox.YesRole) btAbort = box.addButton("OMG! Abort", QMessageBox.NoRole) btEnable = box.addButton( "Enable save and continue", QMessageBox.YesRole ) box.exec_() ans = box.clickedButton() if ans == btEnable: jungfrau_detector.set_save_raw(True) return True elif ans == btAbort: logger.info("not doing helical scan") return False return True return True def daq_collect_points(self, points, visualizer_method, visualizer_params): task = self.active_task() XDIR = -1 folders.make_if_needed() if ( option(ACTIVATE_PULSE_PICKER) and not jungfrau_detector.is_running_detector() ): if QMessageBox.No == QMessageBox.question( self, "X-rays but no Jungfrau", "X-rays will be used bu the Jungfrau will not run.\n\n\tContinue?", ): logger.warning("user forgot to turn on the jungfrau") return if option(ACTIVATE_PULSE_PICKER) or not option(SKIP_ESCAPE_TRANSITIONS_IF_SAFE): self.escape_goToDataCollection() ntrigger = len(points) points *= 1000 # delta tau uses micrometers points[:, 0] *= XDIR # fast X axis is reversed etime = settings.value("exposureTime", type=float) vscale = settings.value(DELTATAU_VELOCITY_SCALE, 1.0, type=float) sort_points = option(DELTATAU_SORT_POINTS) # sync_mode : default=2 # 0 : no sync at all # 1 : synchronize start # 2 : synchronize start and adapt motion speed # this function generates the code blocks: # self.sync_wait and self.sync_run # sync_wait can be put in the program to force a timing sync # sync_run are the commands to run the whole program # sync_flag if not using jungfrau =1 otherwise =0 # D.O. shapepath.meta.update(sync_mode=2, sync_flag=1) shapepath.meta.update(sync_mode=0, sync_flag=0) maxacq_points = 116508 samp_time = 0.0002 # s overhead_time = 0.1 acq_per = int( np.ceil((etime * ntrigger + overhead_time) / (maxacq_points * samp_time)) ) logger.info(f"gather acquisotion period = {acq_per}") logger.info(f"velocity scale {vscale}") shapepath.setup_gather(acq_per=acq_per) shapepath.setup_sync(verbose=True) shapepath.setup_coord_trf() shapepath.points = np.copy(points) if TASK_GRID == task: width, height = visualizer_params logger.debug(f"grid: {width} x {height}") details_1 = [width] details_2 = [height] shapepath.sort_points(xy=False, grp_sz=height) elif task in (TASK_PRELOCATED, TASK_EMBL): if sort_points: shapepath.sort_points() self.daq_method_prelocated_remove_markers() details_1, details_2 = visualizer_params shapepath.setup_motion( mode=3, # 1 = bad pvt 3 = pft (pvt via inverse fourier transform) pt2pt_time=etime * 1000., fnPrg=folders.get_prefixed_file("_program.prg"), scale=vscale, # velocity at target position scaling: 1=optimal speed, 0=zero speed dwell=10, # milli-seconds wait ) shapepath.run() self.qprogress = QProgressDialog(self) self.qprogress.setRange(0, 0) self.qprogress.setLabelText("Acquiring GRID") self.qprogress.setCancelButtonText("Abort Acquisition") self.qprogress.canceled.connect(self.complete_daq) self.qprogress.setWindowModality(Qt.WindowModal) self.qprogress.setAutoClose(True) self.qprogress.show() sequencer_steps = [lambda: self.grids_pause_stage_tracking()] if jungfrau_detector.is_running_detector(): if not self.check_jungfrau_save(): # user aborted run from save data dialog return n_frames = ntrigger uid = settings.value(EXPERIMENT_UID, type=int) backend_extras = self.jungfrau.get_parameters() backend_extras.update( { "swissmx_trajectory_method": visualizer_method, "swissmx_trajectory_details_1": details_1, "swissmx_trajectory_details_2": details_2, } ) jungfrau_detector.set_number_of_frames(n_frames) jungfrau_detector.set_data_owner_uid(uid) sequencer_steps.extend( [ lambda: jungfrau_detector.configure( n_frames=n_frames, outfile=folders.prefix, outdir=folders.raw_folder, uid=uid, backend_extras=backend_extras, ), lambda: jungfrau_detector.arm(), ] ) sequencer_steps.append(lambda: shapepath.wait_armed()) if option(ACTIVATE_PULSE_PICKER): sequencer_steps.append(lambda: pulsePicker.open()) # if settings.value("scanning/trigger_microscope_camera", type=bool): # sample_camera.switch_to_trigger(True) sequencer_steps.append(lambda: shapepath.trigger(wait=0.5)) def shapepath_progress(): while True: p = shapepath.progress() if p < 0: break time.sleep(0.1) self.qprogress.setLabelText(f"Acquiring GRID {p:.0f} / {ntrigger}") logger.warning(f"motion complete!") # sample_camera.stop_camera() # sample_camera.switch_to_trigger(False) # sample_camera.save_buffer_series(folders.prefix) sequencer_steps.append(shapepath_progress) if option(ACTIVATE_PULSE_PICKER): sequencer_steps.append(lambda: pulsePicker.close()) sequencer_steps.append(lambda: jungfrau_detector.wait_finished()) sequencer_steps.append(lambda: self.grids_start_stage_tracking()) self.sequencer = Sequencer(steps=sequencer_steps) self._thread = QThread() self._thread.setObjectName("acquisition_thread") self.sequencer.moveToThread(self._thread) self.sequencer.finished.connect(self.daq_collect_points_wrapup) self._thread.started.connect(self.sequencer.run_sequence) self._thread.start() def run_steps(self, steps, title, at_end=None, cancelable=False): dlg = QProgressDialog(self) dlg.setWindowTitle(title) dlg.setWindowModality(Qt.WindowModal) dlg.setMinimumDuration(0) if not cancelable: dlg.setCancelButton(None) dlg.setRange(0, 0) dlg.setLabelText(f"{title}
") dlg.setAutoClose(True) dlg.show() dlg.setValue(random.randint(1, 20)) class Runner(QObject): finito = pyqtSignal() timeoutExpired = pyqtSignal() errorHappened = pyqtSignal(str) result = pyqtSignal(str) def __init__(self, step_to_run): super().__init__() self.step = step_to_run self.exception = None self.done = False def run(self): try: self.step() except Exception as e: logger.debug(" +> step exception") self.exception = str(e) self.errorHappened.emit(str(e)) self.finito.emit() for n, step in enumerate(steps): logger.info(f"running step {step.title}") dlg.setLabelText(f"{title}
{step.title}") dlg.setValue(n) thread = QThread() runner = Runner(step) runner.moveToThread(thread) thread.started.connect(runner.run) runner.finito.connect(thread.quit) thread.start() while thread.isRunning(): dlg.setValue(random.randint(1, 20)) time.sleep(0.01) if dlg.wasCanceled(): # FIXME ensure we abort the running thread break if dlg.wasCanceled(): break if runner.exception is not None: QMessageBox.critical(self, step.title, str(runner.exception)) break if dlg.wasCanceled(): logger.error(f"sequence {title} was cancelled by user") raise AcquisitionAbortedException(f"sequence {title} was cancelled by user") if at_end is not None: at_end() dlg.reset() def daq_collect_points_wrapup(self): self.qprogress.reset() if self._thread.isRunning(): self._thread.quit() shapepath.gather_upload( os.path.join(folders.res_folder, folders.prefix + ".npz") ) if option(DELTATAU_SHOW_PLOTS): dp = DebugPlot(shapepath) dp.plot_gather(mode=1) pyplot.show() if TASK_PRELOCATED == self.active_task(): self.daq_method_prelocated_update_markers() if option(ACTIVATE_PULSE_PICKER) or not option(SKIP_ESCAPE_TRANSITIONS_IF_SAFE): self.escape_goToSampleAlignment() sequence = { "delta tau program": shapepath.prg, "points": shapepath.points.tolist(), "timestamp": tdstamp(), } sfname = folders.get_prefixed_file("_ScanInfo.json") try: with open(sfname, "w") as sf: logger.info("writing scan info into: {}".format(sfname)) sf.write(json.dumps(sequence)) except: logger.warning(f"failed to write scan info to {sfname}") self.re_connect_collect_button() jungfrau_detector.abort() self.increaseRunNumberRequest.emit() def daq_collect_update_inspect(self, msg): te = self._inspect m = te.toPlainText() te.setPlainText(m + msg + "\n") te.verticalScrollBar().setValue(te.verticalScrollBar().maximum()) def daq_helical_collect(self): """[ [{ 0: (0.0, 1.238401694734829, 3.527, 0.936, 0.001), 120: (0.0, 1.3890393596274455, -1.957242824091597, -0.5639999999999998, 120.001), 240: (0.0, 1.3866130349657206, -1.1642619618562273, 3.105, 240.001) }, { 0: (0.0, 1.238401694734829, 3.527, 0.936, 0.001), 120: (0.0, 1.3890393596274455, -1.957242824091597, -0.5639999999999998, 120.001), 240: (0.0, 1.3866130349657206, -1.1642619618562273, 3.105, 240.001) }] ] """ logger.info("executing collection") htab = self._helical_scan_table num_h = htab.scanHorizontalCount() num_v = htab.scanVerticalCount() if ( settings.value(ACTIVATE_PULSE_PICKER) and not jungfrau_detector.is_running_detector() ): if QMessageBox.No == QMessageBox.question( self, "X-rays but no Jungfrau", "X-rays will be used bu the Jungfrau will not run.\n\n\tContinue?", ): logger.warning("user forgot to turn on the jungfrau") return if option(ACTIVATE_PULSE_PICKER) or not option(SKIP_ESCAPE_TRANSITIONS_IF_SAFE): self.escape_goToDataCollection() folders.make_if_needed() data = htab.get_data() logger.debug(data) start, end = data[0] FX, FY, BX, BZ, O = range(5) x = ( (-1000 * start[0][BX], -1000 * start[120][BX], -1000 * start[240][BX]), (-1000 * end[0][BX], -1000 * end[120][BX], -1000 * end[240][BX]), ) y = (1000 * start[0][FY], 1000 * end[0][FY]) z = ( (-1000 * start[0][BZ], -1000 * start[120][BZ], -1000 * start[240][BZ]), (-1000 * end[0][BZ], -1000 * end[120][BZ], -1000 * end[240][BZ]), ) if jungfrau_detector.is_running_detector(): if not self.check_jungfrau_save(): return saveRaw = jungfrau_detector.is_saving_data() logger.debug(f"x = {x}") logger.debug(f"y = {y}") logger.debug(f"z = {z}") oscillationAngle = settings.value("oscillationAngle", type=float) exposureTime = 1000 * settings.value("exposureTime", type=float) blastRadius = settings.value("blastRadius", type=float) totalRange = num_v * num_h * oscillationAngle # sync_mode : default=2 # 0 : no sync at all # 1 : synchronize start # 2 : synchronize start and adapt motion speed # this function generates the code blocks: # self.sync_wait and self.sync_run # sync_wait can be put in the program to force a timing sync # sync_run are the commands to run the whole program # sync_flag if not using jungfrau =1 otherwise =0 # D.O. helical.meta.update(sync_mode=2, sync_flag=1) helical.meta.update(sync_mode=0, sync_flag=0) helical.calcParam(x=x, y=y, z=z) helical.setup_gather() helical.setup_sync() helical.setup_coord_trf() mode = 1 hRng = (-blastRadius * num_h, blastRadius * num_h) w_start = 1000 * htab.startAngle() wRng = (w_start, w_start + (totalRange * 1000)) logger.info( f"helical params mode={mode}, cnthor={num_h}, cntvert={num_v}, hrng={hRng}, wrng={wRng}" ) helical.setup_motion( mode=mode, cntHor=num_h, cntVert=num_v, hRng=hRng, wRng=wRng, pt2pt_time=exposureTime, ) # hRng in micrometers helical.run() try: with open(folders.get_prefixed_file("_helical_debug.cmd"), "w") as fd: fd.write("calcParam(x={}, y={}, z={})".format(x, y, z)) except: pass self.qprogress = QProgressDialog(self) self.qprogress.setRange(0, 0) self.qprogress.setLabelText("Acquiring HELICAL") self.qprogress.setCancelButtonText("Abort Acquisition") self.qprogress.canceled.connect(self.complete_daq) self.qprogress.setWindowModality(Qt.WindowModal) self.qprogress.setAutoClose(True) self.qprogress.show() sequencer_steps = [lambda: self.grids_pause_stage_tracking()] n_frames = num_h * num_v if jungfrau_detector.is_running_detector(): uid = settings.value(EXPERIMENT_UID, type=int) backend_extras = self.jungfrau.get_parameters() backend_extras.update( { "swissmx_trajectory_method": "grid", "swissmx_trajectory_details_1": [-num_h], "swissmx_trajectory_details_2": [num_v], } ) jungfrau_detector.set_number_of_frames(n_frames) jungfrau_detector.set_data_owner_uid(uid) sequencer_steps.extend( [ lambda: jungfrau_detector.configure( n_frames=n_frames, outfile=folders.prefix, outdir=folders.raw_folder, uid=uid, backend_extras=backend_extras, ), lambda: jungfrau_detector.arm(), ] ) sequencer_steps.append(lambda: helical.wait_armed()) if settings.value(ACTIVATE_PULSE_PICKER): sequencer_steps.extend( [lambda: pulsePicker.open(), lambda: pend_event(0.1)] ) sequencer_steps.append(lambda: helical.trigger()) def motion_progress(): while True: p = helical.progress() if p < 0: break time.sleep(0.1) self.qprogress.setLabelText(f"Acquiring HELICAL {p:.0f} / {n_frames}") logger.warning(f"helical motion complete!") sequencer_steps.append(motion_progress) if settings.value(ACTIVATE_PULSE_PICKER): sequencer_steps.append(lambda: pulsePicker.close()) sequencer_steps.append(lambda: self.grids_start_stage_tracking()) self.sequencer = Sequencer(steps=sequencer_steps) self._thread = QThread() self._thread.setObjectName("acquisition_thread") self.sequencer.moveToThread(self._thread) self.sequencer.finished.connect(self.daq_helical_collect_wrapup) self._thread.started.connect(self.sequencer.run_sequence) self._thread.start() def daq_helical_collect_wrapup(self): try: self.qprogress.reset() except: pass if self._thread.isRunning(): self._thread.quit() helical.gather_upload(os.path.join(folders.res_folder, folders.prefix + ".npz")) self.re_connect_collect_button() jungfrau_detector.abort() if option(ACTIVATE_PULSE_PICKER) or not option(SKIP_ESCAPE_TRANSITIONS_IF_SAFE): self.escape_goToSampleAlignment() self.increaseRunNumberRequest.emit() if option(DELTATAU_SHOW_PLOTS): hsgui = HelicalScanGui(helical) hsgui.interactive_anim() def complete_daq(self): logger.info("daq completed: cleaning up") try: self.qprogress.reset() except: pass try: if self._thread.isRunning(): self._thread.quit() except: pass finally: self.abort_measurement() self.re_connect_collect_button() def build_daq_methods_grid_tab(self): self._grids = [] tab = self._tab_daq_method_grid layout = tab.layout() # gridlayout self._sb_grid_x_step.setValue(30.0) self._sb_grid_y_step.setValue(30.0) self._bt_add_grid.clicked.connect(self.daq_grid_add_grid) self._bt_remove_all_grids.clicked.connect(self.daq_grid_remove_all) self._find_targets_from_microscope_image.clicked.connect( self.daq_grid_findxtals ) self.addGridRequest.connect(self.daq_grid_add_grid) self.gridUpdated.connect(self.daq_grid_update) def re_connect_collect_button(self, callback=None, **kwargs): logger.debug("re_connect_collect_button() {} => {}".format(callback, kwargs)) return # button = self._button_collect # if callback is None: # callback = self.execute_collection # button.setAccessibleName("collect_button") # kwargs["label"] = "Collect" # # try: # button.disconnect() # finally: # button.clicked.connect(callback) # # if "accessibleName" in kwargs: # button.setAccessibleName(kwargs["accessibleName"]) # # if "label" in kwargs: # button.setText(kwargs["label"]) # self.load_stylesheet() def collect_abort_grid(self): self._is_aborted = True try: self._eigerwaitthread._is_aborted = True except: pass logger.warning("aborting grid scan") self.abort_measurement() delta_tau.abort() jungfrau_detector.disarm() self.re_connect_collect_button() def execute_collection(self): task = self.active_task() self._is_aborted = False method = self._tabs_daq_methods.currentWidget().accessibleName() if task == TASK_POINT_SHOOT: QMessageBox.information("not implemented", "not implemented") self.re_connect_collect_button( callback=self.collect_abort_grid, accessibleName="grid_abort", label="Abort", ) elif task == TASK_GRID: self.re_connect_collect_button( callback=self.collect_abort_grid, accessibleName="grid_abort", label="Abort Grid Scan", ) self._inspect = self._grid_inspect_area self._inspect.setPlainText("") self.daq_grid_collect_grid() elif task == TASK_PRELOCATED: self._inspect = self._preloc_inspect_area self._inspect.setPlainText("") self.daq_prelocated_collect_points() elif task == TASK_HELICAL: self.daq_helical_collect() elif task == TASK_EMBL: self.daq_embl_collect_points() def create_escape_toolbar(self): cont = self._rightmost w = QGroupBox("Escape") layout = QHBoxLayout() w.setLayout(layout) but = QPushButton("Exchange\nSample") but.setAccessibleName("escape_button_se") but.setObjectName("action_SampleExchange") but.clicked.connect(self.escape_goToSampleExchange) layout.addWidget(but) but = QPushButton("Alignment") but.setAccessibleName("escape_button_sa") but.clicked.connect(self.escape_goToSampleAlignment) layout.addWidget(but) but = QPushButton("Collection") but.setAccessibleName("escape_button_dc") but.clicked.connect(self.escape_goToDataCollection) layout.addWidget(but) cont.layout().addWidget(w) def init_actions(self): self.toolBar.setToolButtonStyle(Qt.ToolButtonTextUnderIcon) self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.SHIFT + Qt.Key_S), self) self.shortcut.activated.connect(self.saveSampleCameraScreenshot) self.shortcut = QShortcut(QKeySequence(Qt.Key_F2), self) self.shortcut.activated.connect(self.saveSampleCameraScreenshot) self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.Key_S), self) self.shortcut.activated.connect(self.saveSampleCameraScreenshotView) self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.SHIFT + Qt.Key_T), self) self.shortcut.activated.connect(lambda: qutilities.toggle_warn(SKIP_ESCAPE_TRANSITIONS_IF_SAFE)) for k in range(10): qkey = k + Qt.Key_0 self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.SHIFT + qkey), self) self.shortcut.activated.connect(lambda key=k: self.saveBookmark(key)) self.shortcut = QShortcut(QKeySequence(Qt.CTRL + qkey), self) self.shortcut.activated.connect(lambda key=k: self.gotoBookmark(key)) self._button_collect.clicked.connect(self.execute_collection) # Toolbar buttons icon_size = QSize(50, 50) icon = qtawesome.icon("material.photo_camera") action = QAction(icon, "Save View", self) action.setToolTip( "(Ctrl+S) Take a screenshot of the currently visible sample image, including markers. Saves in current folder." ) action.triggered.connect(self.saveSampleCameraScreenshotView) self.toolBar.addAction(action) action = QAction(icon, "Save Original", self) action.setToolTip( "(Ctrl+Shift+S) Take a screenshot of the sample image, without markers. Saves in current folder." ) action.triggered.connect(self.saveSampleCameraScreenshot) self.toolBar.addAction(action) if os.getenv("DEVELOPMENT_VERSION"): icon = qtawesome.icon("material.style") action = QAction(icon, "Reload Stylesheet", self) action.triggered.connect(self.load_stylesheet) self.toolBar.addAction(action) # icon = qtawesome.icon("material.shutter_speed") action = QAction("Use Pulse Picker", self) action.setCheckable(True) action.setChecked(option(ACTIVATE_PULSE_PICKER)) action.triggered.connect(lambda: toggle_option(ACTIVATE_PULSE_PICKER)) self.toolBar.addAction(action) icon = qtawesome.icon("material.timeline") action = QAction(icon, "Add Line", self) action.triggered.connect(self.roi_add_line) self.toolBar.addAction(action) icon = qtawesome.icon("material.tab_unselected") action = QAction(icon, "Add Rect", self) action.triggered.connect(self.roi_add_rect) self.toolBar.addAction(action) icon = qtawesome.icon("material.play_for_work") action = QAction(icon, "TELL Mount", self) action.setToolTip( "BookMark(0) => Move stages CZ, CX, FX, FY, and Omega to Tell sample changer mount position" ) action.triggered.connect(self.escape_goToTellMountPosition) self.toolBar.addAction(action) self.toolBar.addWidget(qutilities.horiz_spacer()) icon = qtawesome.icon("material.sync") action = QAction(icon, "Sample\nExchange", self) action.setToolTip("Move devices so a sample can be exchanged.") action.setObjectName("action_SampleExchange") action.triggered.connect(self.escape_goToSampleExchange) self.toolBar.addAction(action) self.toolBar.widgetForAction(action).setAccessibleName("action_SampleExchange") icon = qtawesome.icon("material.my_location") action = QAction(icon, "Sample\nAlignment", self) action.setToolTip("Move devices so a sample can be aligned.") action.triggered.connect(self.escape_goToSampleAlignment) self.toolBar.addAction(action) self.toolBar.widgetForAction(action).setAccessibleName("action_SampleAlignment") icon = qtawesome.icon("material.fingerprint") action = QAction(icon, "Data\nCollection", self) action.setToolTip("Move devices so a sample can be collected.") action.triggered.connect(self.escape_goToDataCollection) self.toolBar.addAction(action) self.toolBar.widgetForAction(action).setAccessibleName("action_DataCollection") self.actionQuit.triggered.connect(self.really_quit) self.actionPreferences.triggered.connect(self.openPreferencesDialog) self.actionHome_Fast_Stages.triggered.connect(self.home_deltatau_faststages) self.actionUser_Storage.triggered.connect(self.update_user_and_storage) self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.SHIFT + Qt.Key_L), self) self.shortcut.activated.connect(self.roi_add_line) self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.SHIFT + Qt.Key_R), self) self.shortcut.activated.connect(self.roi_add_rect) self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.SHIFT + Qt.Key_M), self) self.shortcut.activated.connect(self.toggle_mouse_tracking) self.shortcut = QShortcut(QKeySequence(Qt.Key_F11), self) self.shortcut.activated.connect(self.toggle_maximized) self.shortcut = QShortcut(QKeySequence(Qt.Key_F12), self) self.shortcut.activated.connect(self.show_window_configuration) self.shortcut = QShortcut(QKeySequence(Qt.Key_F5), self) self.shortcut.activated.connect(self.generic_dialog) self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.Key_B), self) self.shortcut.activated.connect(lambda: self._beammark.toggle_handle()) self.shortcut = QShortcut(QKeySequence(Qt.CTRL + Qt.Key_P), self) self.shortcut.activated.connect(self.camera_pause_toggle) # adding a menu entry to one of the menus action = QAction("Beam Marker Size", self) action.setToolTip( "Update the beam marker size on GUI; *not* the actual beam size!" ) action.setStatusTip( "Update the beam marker size on GUI; *not* the actual beam size!" ) action.triggered.connect(self.set_beam_size_marker_dialog) self.menuAdvanced.addAction(action) action = QAction("Backlight Reference Positions", self) action.setToolTip("Update the reference positions for the backlight") action.setStatusTip("Update the reference positions for the backlight") action.triggered.connect(self.set_backlight_positions_dialog) self.menuAdvanced.addAction(action) action = QAction("Collimator Reference Positions", self) action.setToolTip("Update the reference positions for the collimator") action.setStatusTip("Update the reference positions for the collimator") action.triggered.connect(self.set_collimator_reference_positions) self.menuAdvanced.addAction(action) action = QAction("Cryojet Reference Positions", self) action.setToolTip( "Update the reference positions for the cryojet nozzle position" ) action.setStatusTip( "Update the reference positions for the cryojet nozzle position" ) action.triggered.connect(self.set_cryojet_positions_dialog) self.menuAdvanced.addAction(action) action = QAction("Post sample tube Reference Positions", self) action.setToolTip("Update the reference positions for the post tube") action.setStatusTip("Update the reference positions for the post tube") action.triggered.connect(self.set_posttube_references_dialog) self.menuAdvanced.addAction(action) action = QAction("Delta Tau Parameters", self) action.setToolTip("Parameters affecting the Delta Tau") action.setStatusTip("Parameters affecting the Delta Tau") action.triggered.connect(self.set_deltatau_parameters) self.menuAdvanced.addAction(action) action = QAction("Tell Mount Positions", self) action.setToolTip("Mount positions for TELL sample changer") action.setStatusTip("Parameters affecting the Delta Tau") action.triggered.connect(self.set_tell_mount_positions) self.menuAdvanced.addAction(action) def daq_method_prelocated_remove_markers(self): try: for m in self._marker_rois: m.disconnect_signals() self.vb.removeItem(m) except Exception as e: logger.warning("maybe failed removing markers: {}".format(e)) self._marker_rois = [] def pause_all_markers(self): for m in self._marker_rois: m.disconnect_signals() def resume_all_markers(self): for m in self._marker_rois: m.reconnect_signals() def daq_method_prelocated_update_markers(self): self.daq_method_prelocated_remove_markers() data = self.prelocationModule.get_data() add_xtals = self.prelocationModule._xtals_transformed draw_xtals = self.prelocationModule.set_draw_crystal_marks vb = self.vb self._marker_rois = [] ppm = self.getPpm() for n, cc in enumerate(data): is_fiducial, gx, gy, cx, cy, ox, oy = cc if not is_fiducial: if not (add_xtals and draw_xtals): continue logger.debug( f"adding {'fiducial' if is_fiducial else 'xtal'} mark #{n}: {is_fiducial} {gx:.3f}, {gy:.3f}, {cx:.1f}, {cy:.1f}" ) m = CrystalCircle( gonio_xy=(gx, gy), parent=self, model_row_index=n, is_fiducial=is_fiducial, ppm=ppm, ) # m.sigRegionChangeFinished.connect(lambda roi=m: self.daq_method_prelocated_update_model(roi)) self._marker_rois.append(m) vb.addItem(m) for c in self._marker_rois: c.reconnect_signals() c.follow_stage() def daq_method_prelocated_set_fiducial(self, camx, camy, gx, gy): logger.debug(f"camx, camy: {camx}, {camy}, fx, fy: {gx, gy}") self.prelocationModule.set_fiducial_coords(camx, camy, gx, gy) def daq_method_prelocated_append_data(self, x, y, gx, gy): logger.debug("appending to model: {} {}".format((x, y), (gx, gy))) self.prelocationModule.append_data((x, y, gx, gy)) self.daq_method_prelocated_update_markers() def daq_method_prelocated_update_model(self, roi): row = roi.get_model_row() pos = roi.pos() self.prelocationModule.set_data_camera(row, pos) logger.debug("updating row {} => {}".format(row, pos)) def daq_method_prelocated_add_reference(self): self._references.append(pg.CircleROI()) def build_daq_methods_prelocated_tab(self): tab = self._tab_daqmethod_prelocated self.prelocationModule = PrelocatedCoordinatesModel.PrelocatedCoordinates( parent=self ) tab.layout().addWidget(self.prelocationModule) self.prelocationModule.prefixSelected.connect( lambda prefix: self._le_prefix.setText(prefix) ) self.prelocationModule.dataFileLoaded.connect( self.daq_method_prelocated_update_markers ) self.prelocationModule.prelocatedDataUpdated.connect( self.daq_method_prelocated_update_markers ) self.prelocationModule.markersDeleted.connect( self.daq_method_prelocated_remove_markers ) self.fiducialPositionSelected.connect(self.daq_method_prelocated_set_fiducial) self.appendPrelocatedPosition.connect(self.daq_method_prelocated_append_data) self.prelocationModule.moveFastStageRequest.connect(self.move_fast_stage) self._preloc_inspect_area = QPlainTextEdit() tab.layout().addWidget(self._preloc_inspect_area) def move_fast_stage(self, x, y): logger.info(f"received request to move gonio to x, y = {x:.3f}, {y:.3f} mm") fx_motor, fy_motor, bx_motor, bz_motor, omega_motor = self.get_gonio_tweakers() fx_motor.move_motor_to_position(x) fy_motor.move_motor_to_position(y) def set_beam_size_marker_dialog(self): w, h = settings.value(BEAM_SIZE) d = GenericDialog( title="Beamsize", message="Enter the size of the beam in microns", inputs={ "width": ( "Width", int(w), Spinner(int(w), min=1, max=200, suffix=" \u00B5m"), ), "height": ( "Height", int(h), Spinner(int(h), min=1, max=200, suffix=" \u00B5m"), ), }, ) if d.exec(): results = d.results logger.info("Updating beamsize to {}".format(results)) w, h = results["width"], results["height"] logger.debug("types {}".format(type(w))) settings.setValue(BEAM_SIZE, (w, h)) self._beammark.set_beam_size((w, h)) settings.sync() def set_posttube_references_dialog(self): x_up = settings.value("post_sample_tube/x_up", 0.0) y_up = settings.value("post_sample_tube/y_up", 0.0) x_down = settings.value("post_sample_tube/x_down", 0.0) y_down = settings.value("post_sample_tube/y_down", 0.0) dx = settings.value("post_sample_tube/dx", 0.0) dy = settings.value("post_sample_tube/dy", 0.0) tz_in = settings.value("post_sample_tube/z_in", 0.0) tz_out = settings.value("post_sample_tube/z_out", 0.0) d = GenericDialog( title="Post Sample Tube Configuration", message="Enter the relative displacements for X and Y to move the post sample tube either in or out.", inputs={ "post_sample_tube/x_up": ( "Up X", float(x_up), Spinner(float(x_up), decimals=3, min=-45.0, max=15.0, suffix=" mm"), ), "post_sample_tube/y_up": ( "Up Y", float(y_up), Spinner(float(y_up), decimals=3, min=-45.0, max=15.0, suffix=" mm"), ), "post_sample_tube/x_down": ( "Down X", float(x_down), Spinner( float(x_down), decimals=3, min=-45.0, max=15.0, suffix=" mm" ), ), "post_sample_tube/y_down": ( "Down Y", float(y_down), Spinner( float(y_down), decimals=3, min=-45.0, max=15.0, suffix=" mm" ), ), "post_sample_tube/dx": ( "out delta X", float(dx), Spinner(float(dx), decimals=3, min=-32.0, max=32.0, suffix=" mm"), ), "post_sample_tube/dy": ( "out delta Y", float(dy), Spinner(float(dy), decimals=3, min=-32.0, max=32.0, suffix=" mm"), ), "post_sample_tube/z_in": ( "tube Z in position", float(tz_in), Spinner(float(tz_in), decimals=3, min=-8.0, max=1.0, suffix=" mm"), ), "post_sample_tube/z_out": ( "tube Z OUT position", float(tz_out), Spinner(float(tz_out), decimals=3, min=-8.0, max=1.0, suffix=" mm"), ), }, ) if d.exec(): results = d.results logger.info("setting post-sample-tube displacements {}".format(results)) for k, v in results.items(): settings.setValue(k, v) settings.sync() def set_collimator_reference_positions(self): x_out = float(settings.value("collimator/dx", 0.0)) y_out = float(settings.value("collimator/dy", 0.0)) x_in = float(settings.value("collimator/x_in", 0.0)) y_in = float(settings.value("collimator/y_in", 0.0)) d = GenericDialog( title="Collimator configuration", message="Enter reference positions for the collimator", inputs={ "collimator/dx": ( "Collimator out deltaX", x_out, Spinner(x_out, decimals=3, min=-15.9, max=15.9, suffix=" mm"), ), "collimator/dy": ( "Collimator out deltaY", y_out, Spinner(y_out, decimals=3, min=-15.9, max=15.9, suffix=" mm"), ), "collimator/x_in": ( "Collimator in X", x_in, Spinner(x_in, decimals=3, min=-15.9, max=15.9, suffix=" mm"), ), "collimator/y_in": ( "Collimator in Y", y_in, Spinner(y_in, decimals=3, min=-15.9, max=15.9, suffix=" mm"), ), }, ) if d.exec(): results = d.results logger.info("setting collimator reference positions {}".format(results)) for k, v in results.items(): settings.setValue(k, v) settings.sync() def set_backlight_positions_dialog(self): p_in = int(settings.value("backlight/in")) p_out = int(settings.value("backlight/out")) p_diode = int(settings.value("backlight/diode")) d = GenericDialog( title="Back Light configuration", message="Enter reference positions for the backlight", inputs={ "backlight/in": ( "In position", p_in, Spinner(p_in, min=-30000, max=10), ), "backlight/out": ( "Out position", p_out, Spinner(p_out, min=-1000, max=10), ), "backlight/diode": ( "Diode position", p_diode, Spinner(p_diode, min=-40000, max=-20000), ), }, ) if d.exec(): results = d.results logger.info("setting back light reference positions {}".format(results)) for k, v in results.items(): settings.setValue(k, v) settings.sync() def set_cryojet_positions_dialog(self): p_in = settings.value(CRYOJET_NOZZLE_IN, type=float) p_out = settings.value(CRYOJET_NOZZLE_OUT, type=float) motion_enabled = option(CRYOJET_MOTION_ENABLED) d = GenericDialog( title="Cryojet Nozzle Configuration", message="Enter reference positions for the cryojet nozzle position", inputs={ CRYOJET_NOZZLE_IN: ("In position", p_in, Spinner(p_in, min=3, max=15)), CRYOJET_NOZZLE_OUT: ( "Out position", p_out, Spinner(p_out, min=-1000, max=10), ), CRYOJET_MOTION_ENABLED: ( "Move Cryojet in Transitions", motion_enabled, Checkbox(motion_enabled, "move cryojet"), ), }, ) if d.exec(): results = d.results logger.info("setting cryojet reference positions {}".format(results)) for k, v in results.items(): settings.setValue(k, v) settings.sync() def set_deltatau_parameters(self): a = settings.value(DELTATAU_VELOCITY_SCALE, 1, type=float) b = option(DELTATAU_SORT_POINTS) c = option(DELTATAU_OMEGACOS) d = option(DELTATAU_SHOW_PLOTS) d = GenericDialog( title="Delta Tau Parameters", message="These parameters affect the data collection.", inputs={ DELTATAU_VELOCITY_SCALE: ( "Velocity Scale (1=optimal, 0=zero vel at target)", a, Spinner(a, min=0, max=1, suffix=""), ), DELTATAU_SORT_POINTS: ( "Sort pointshoot/prelocated coords", b, Checkbox(b, "sort points"), ), DELTATAU_SHOW_PLOTS: ( "show plots after collection", d, Checkbox(d, "correct"), ), DELTATAU_OMEGACOS: ( "preloc. correct omega tilt", c, Checkbox(c, "correct"), ), }, ) if d.exec(): results = d.results logger.info("setting delta tau parameters {}".format(results)) for k, v in results.items(): settings.setValue(k, v) settings.sync() def set_tell_mount_positions(self): AUTODRY_ENABLED = "tell/autodry_enabled" AUTODRY_MAXMOUNTS = "tell/autodry_max_number_of_mounts" AUTODRY_MAXTIME = "tell/autodry_max_time" SECS_HOURS = 60 * 60 enabled = option(AUTODRY_ENABLED) maxtime = settings.value(AUTODRY_MAXTIME, type=float) / SECS_HOURS maxmounts = settings.value(AUTODRY_MAXMOUNTS, type=int) d = GenericDialog( title="TELL Settings", message="These control some features of the TELL sample changer", inputs={ AUTODRY_ENABLED: ("Auto dry", enabled, Checkbox(enabled, "enabled")), AUTODRY_MAXMOUNTS: ( "Max. num. mounts between dry", maxmounts, Spinner(maxmounts, decimals=0, min=1, max=100, suffix=" mounts"), ), AUTODRY_MAXTIME: ( "Max. time between dry", maxtime, Spinner(maxtime, decimals=1, min=0.5, max=5, suffix=" hours"), ), }, ) if d.exec(): results = d.results logger.info("setting tell parameters {}".format(results)) for k, v in results.items(): if k == AUTODRY_MAXTIME: v = v * SECS_HOURS settings.setValue(k, v) settings.sync() def generic_dialog(self, title="test", message=""): d = GenericDialog(title="Beamsize", message="some message") if d.exec(): results = d.results print(results) def toggle_maximized(self): if self.isMaximized(): self.showNormal() else: self.showMaximized() def show_window_configuration(self): logger.debug("maximized? {}".format(self.isMaximized())) def home_deltatau_faststages(self): logger.warning("homing fast stages") epics.PV("SAR-EXPMX1:ASYN.AOUT").put(b"enable plc 1") def update_user_and_storage(self): global folders pgroup = settings.value(EXPERIMENT_PGROUP, "not_set_yet") diag = GenericDialog( title="P-group for experiment", message="Enter the p-group to be used", inputs={EXPERIMENT_PGROUP: ("P-group", pgroup, QLineEdit(pgroup))}, use_buttons=False, ) diag.setModal(True) run_user = getpass.getuser() pgrp_re = re.compile(r"^p(?P\d{5})$") if not self.is_recently_active(): diag.exec() results = diag.results for k, v in results.items(): m = pgrp_re.match(v) if m: settings.setValue(k, v) settings.setValue(EXPERIMENT_UID, int(m["uid"])) else: QMessageBox.critical( self, "wrong P-group format", "P-groups are in the format:\n\n\t\tp?????\n\n\twhere ? = digit", ) self.update_user_and_storage() return settings.sync() folders.set_pgroup(settings.value(EXPERIMENT_PGROUP)) try: folders.check_permissons() except: folder = folders.pgroup_folder pgroup = settings.value(EXPERIMENT_PGROUP) box = QMessageBox() box.setText("No Write Permission!") box.setInformativeText( "User {} has no write access to p-group {} folder:\n ➜ {}\n".format( run_user, pgroup, folder ) ) box.setIcon(QMessageBox.Warning) box.setDetailedText( "The folder /sf/bernina/data/{pgroup}/res/ has to writable by the user {user}, currently running the SwissMX app.".format( pgroup=pgroup, user=run_user ) ) btIgnore = box.addButton("Ignore", QMessageBox.NoRole) btRetry = box.addButton("Le'me try again", QMessageBox.YesRole) box.exec_() ans = box.clickedButton() if ans == btRetry: self.update_user_and_storage() elif ans == btIgnore: logger.warning("no write access to pgroup but user didn't care") self._label_pgroup.setText(settings.value(EXPERIMENT_PGROUP)) def init_settings(self): # settings = Settings global folders s = settings keys = s.allKeys() if ACTIVATE_PULSE_PICKER not in keys: settings.setValue(ACTIVATE_PULSE_PICKER, False) if SKIP_ESCAPE_TRANSITIONS_IF_SAFE not in keys: settings.setValue(SKIP_ESCAPE_TRANSITIONS_IF_SAFE, False) if EXPERIMENT_PGROUP not in keys: self.update_user_and_storage() if "hits/marker_size" not in keys: settings.setValue("hits/marker_size", 10) if "window/geometry" in keys: self.restoreGeometry(s.value("window/geometry", "")) self.restoreState(s.value("window/windowState", "")) if "window/main_splitter" in keys: sizes = [int(s) for s in s.value("window/main_splitter")] self._main_splitter.setSizes(sizes) else: self._main_splitter.setSizes([500, 1200]) if "graphs/show_auxiliary" not in keys: s.setValue("graphs/show_auxiliary", False) if "graphs/auxiliary_graph_index" not in keys: s.setValue("graphs/auxiliary_graph_index", 0) if "scan_request/last_location" not in keys: d10 = os.path.join(os.path.expanduser("~"), "Data10") s.setValue("scan_request/last_location", d10) if BEAM_MARKER_POSITIONS in keys: self._beam_markers = s.value(BEAM_MARKER_POSITIONS) self.update_beam_marker_fitters() logger.info("read beam markers {}".format(self._beam_markers)) if BEAM_SIZE in keys: logger.info( "setting beamsize from stored settings: {}".format(s.value(BEAM_SIZE)) ) else: logger.warning("beam size may not reflect reality, please check") s.setValue(BEAM_SIZE, (40, 20)) if CAMERA_TRANSFORMATIONS in keys: sample_camera.set_transformations(s.value(CAMERA_TRANSFORMATIONS)) if CAMERA_ZOOM_TO_PPM in keys: self._zoom_to_ppm = s.value(CAMERA_ZOOM_TO_PPM) logger.info(f"{CAMERA_ZOOM_TO_PPM} updated: {self._zoom_to_ppm}") self.update_ppm_fitters() def really_quit(self): """called when user Ctrl-Q the app""" global user, just_quit if ( just_quit or QMessageBox.question( self, "", "Are you sure you want to quit?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No, ) == QMessageBox.Yes ): self._do_quit = True self.close() def closeEvent(self, event): """this is called when the user clicks the window's cross icon""" global user, just_quit if ( just_quit or self._do_quit or QMessageBox.question( self, "", "Are you sure you want to quit?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No, ) == QMessageBox.Yes ): sample_camera.disconnect() settings.setValue("window/geometry", self.saveGeometry()) settings.setValue("window/windowState", self.saveState()) settings.setValue("window/main_splitter", self._main_splitter.sizes()) settings.setValue("last_active", time.time()) QMainWindow.closeEvent(self, event) else: event.ignore() def is_recently_active(self): last_active = settings.value("last_active", 0, type=float) minutes_since_last = int((time.time() - last_active) / 60.0) return minutes_since_last < 60 def openPreferencesDialog(self): PreferencesDialog(self).exec_() def set_app_icon(self): scriptDir = os.path.dirname(os.path.realpath(__file__)) self.setWindowIcon( QtGui.QIcon(os.path.join(scriptDir + os.path.sep + "logo.png")) ) def set_widget_background_color(self, color): """change a widget's color :param color: :return: """ try: color = QtGui.QColor.colorNames().index(color) except: return p = self.palette() p.setColor(self.backgroundRole(), color) self.setPalette(p) def load_stylesheet(self): with open("swissmx.css", "r") as sheet: self.setStyleSheet(sheet.read()) def sigint_handler(*args): """Handler for the SIGINT signal.""" global app app.quit() def main(): global app import sys import argparse parser = argparse.ArgumentParser() parser.add_argument( "--number", "-n", help="number; default=100", type=int, default=100 ) parser.add_argument( "--float", "-f", help="float number; default=0.1", type=float, default=0.1 ) parser.add_argument( "--prefix", "-p", help="prefix; default=dark", type=str, default="bogus" ) parser.add_argument( "--local", help="local instance for testing (avoids " "connecting to some things)", action="store_true", ) parser.add_argument("--verbose", "-v", help="verbose flag", action="store_true") args = parser.parse_args() # needed so pycharm can restart application signal.signal(signal.SIGINT, sigint_handler) from PyQt5.QtWidgets import QApplication # set app icon app = QApplication(sys.argv) app_icon = QtGui.QIcon() app_icon.addFile("artwork/logo/16x16.png", QtCore.QSize(16, 16)) app_icon.addFile("artwork/logo/24x24.png", QtCore.QSize(24, 24)) app_icon.addFile("artwork/logo/32x32.png", QtCore.QSize(32, 32)) app_icon.addFile("artwork/logo/48x48.png", QtCore.QSize(48, 48)) app_icon.addFile("artwork/logo/256x256.png", QtCore.QSize(256, 256)) app.setWindowIcon(app_icon) splash_pix = QPixmap("artwork/logo/256x256.png") splash = QSplashScreen(splash_pix, Qt.WindowStaysOnTopHint) splash.setWindowFlags(Qt.WindowStaysOnTopHint | Qt.FramelessWindowHint) splash.setEnabled(False) progressBar = QProgressBar(splash) progressBar.setMaximum(10) progressBar.setGeometry(0, splash_pix.height() - 50, splash_pix.width(), 20) splash.show() # splash.showMessage("

Solid support scanning brought to you by

", Qt.AlignTop | Qt.AlignCenter, Qt.black) for i in range(1, 11): progressBar.setValue(i) t = time.time() while time.time() < t + 0.1: app.processEvents() main = Main() main.show() splash.finish(main) main.update_user_and_storage() sys.exit(app.exec_()) if __name__ == "__main__": app = None main()