Files
SwissMX/obsolete.py

3051 lines
106 KiB
Python

# **************** OBSOLETE AND/OR OLD STUFF ****************
class WndSwissMx(QMainWindow, Ui_MainWindow):
# **************** OBSOLETE AND/OR OLD STUFF ****************
# functions are prefixed with _OLD_
def _OLD_build_daq_methods_grid_tab(self):
self._grids = []
tab = self._tab_daq_method_grid
layout = tab.layout() # gridlayout
self._sb_grid_x_step.setValue(30)
self._sb_grid_y_step.setValue(30)
self._bt_add_grid.clicked.connect(self._OLD_daq_grid_add_grid)
self._bt_remove_all_grids.clicked.connect(self._OLD_daq_grid_remove_all)
self._find_targets_from_microscope_image.clicked.connect(self._OLD_daq_grid_findxtals)
self.addGridRequest.connect(self._OLD_daq_grid_add_grid)
self.gridUpdated.connect(self._OLD_daq_grid_update)
def _OLD_build_daq_methods_prelocated_tab(self):
tab = self._tab_daq_method_prelocated
self.prelocationModule = PrelocatedCoordinatesModel.PrelocatedCoordinates(parent=self)
tab.layout().addWidget(self.prelocationModule)
self.prelocationModule.prefixSelected.connect(lambda prefix: self._le_prefix.setText(prefix))
self.prelocationModule.dataFileLoaded.connect(self._OLD_daq_method_prelocated_update_markers)
self.prelocationModule.prelocatedDataUpdated.connect(self._OLD_daq_method_prelocated_update_markers)
self.prelocationModule.markersDeleted.connect(self._OLD_daq_method_prelocated_remove_markers)
self.fiducialPositionSelected.connect(self._OLD_daq_method_prelocated_set_fiducial)
self.appendPrelocatedPosition.connect(self._OLD_daq_method_prelocated_append_data)
self.prelocationModule.moveFastStageRequest.connect(self._OLD_move_fast_stage)
self._preloc_inspect_area = QPlainTextEdit()
tab.layout().addWidget(self._preloc_inspect_area)
def _OLD_build_daq_methods_embl_tab(self):
app = QApplication.instance()
self._tab_daq_method_embl.setLayout(QVBoxLayout())
layout = self._tab_daq_method_embl.layout()
#motors = self.get_gonio_motors()
self._embl_module = EmblWidget(self) #ZAC: orig. code
#self._embl_module.configure(motors, app._camera, app._zoom)
layout.addWidget(self._embl_module)
def _OLD_create_helical_widgets(self):
tbox = self._helical_tablebox
htab = self._helical_scan_table = HelicalTableWidget()
htab.gonioMoveRequest.connect(self._OLD_move_gonio_to_position)
tbox.setLayout(QVBoxLayout())
grp = QWidget()
grp.setLayout(QFormLayout())
le = QSpinBox()
le.setRange(1, 100)
le.setValue(htab.scanHorizontalCount())
le.valueChanged.connect(lambda cnt: htab.setScanHorizontalCount(cnt))
grp.layout().addRow("Horizontal Count", le)
le = QSpinBox()
le.setRange(1, 100)
le.setValue(htab.scanVerticalCount())
le.valueChanged.connect(lambda cnt: htab.setScanVerticalCount(cnt))
grp.layout().addRow("Vertical Count", le)
le = QDoubleSpinBox()
le.setRange(-180.0, 180.0)
le.setSingleStep(5.0)
le.setSuffix(" degrees")
le.valueChanged.connect(htab.setStartAngle)
grp.layout().addRow("Start angle", le)
tbox.layout().addWidget(grp)
widgi = QWidget()
widgi.setLayout(QHBoxLayout())
tbox.layout().addWidget(widgi)
but = QPushButton("Add Crystal")
but.clicked.connect(htab.add_xtal)
widgi.layout().addWidget(but)
but = QPushButton("Set START")
but.clicked.connect(lambda: htab.set_xtal_start(self.get_gonio_positions()))
widgi.layout().addWidget(but)
but = QPushButton("Set END")
but.clicked.connect(lambda: htab.set_xtal_end(self.get_gonio_positions()))
widgi.layout().addWidget(but)
tbox.layout().addWidget(htab)
def _OLD_add_beam_marker(self):
app = QApplication.instance()
cfg = app._cfg
w, h = cfg.value(AppCfg.GEO_BEAM_SZ)
self._beammark = bm = CstROI.BeamMark([100, 100], (int(w), int(h)), parent=self)
tr=QtGui.QTransform() # prepare ImageItem transformation:
tr.rotate(30)
bm.setTransform(tr) # assign transform
self.vb.addItem(self._beammark)
bm=UsrGO.BeamMark([50, 120], [30, 20])
self.vb.addItem(bm)
vi=UsrGO.Grid((120, -100), (200, 150), (30, 20), 2)
self.vb.addItem(vi)
def _OLD_camera_pause_toggle(self):
app=QApplication.instance()
app._camera.pause()
def _OLD_init_settings_tracker(self):
app=QApplication.instance()
cfg=app._cfg
_log.info("configuring widget persistence")
fields = {
# 'folder': (self._label_folder, str),
"project": (self._le_project, str),
"prefix": (self._le_prefix, str),
"actual_prefix": (self._label_actual_prefix, str),
"exposureTime": (self._dsb_exposure_time, float),
"oscillationAngle": (self._dsb_oscillation_step, float),
"blastRadius": (self._dsb_blast_radius, float),
}
for key, f_config in fields.items():
widget, conv = f_config
value = cfg.value(key)
try:
wset, wget = widget.setText, widget.text
_log.debug("tracking text field {}".format(key))
except AttributeError:
_log.debug("tracking {} number field {}".format(conv, key))
wset, wget = widget.setValue, lambda fget=widget.value: conv(fget())
except Exception as e:
_log.error(e)
try:
wset(conv(value))
except Exception as e:
_log.debug(e)
_log.warning('failed for "{}" updating field of type {} with {}'.format( key, type(widget), value))
finally:
# _log.debug('wget = {}; wset = {}'.format(wget, wset))
widget.editingFinished.connect(lambda w=widget, k=key, func_get=wget: self.persist_setting(k, func_get() ) )
#self.storage_cascade_prefix(None) #ZAC: orig. code
def _OLD_init_validators(self):
identifier_regex = QRegExp("[a-z-A-Z_0-9%]+")
self._le_project.setValidator(QRegExpValidator(identifier_regex, self._le_project))
self._le_prefix.setValidator(QRegExpValidator(identifier_regex, self._le_prefix))
def _OLD_wire_storage(self):
self._le_prefix.textChanged.connect(self._OLD_storage_cascade_prefix)
self._le_prefix.textChanged.connect(lambda newtext: self.prefixChanged.emit(newtext))
self._le_project.textChanged.connect(self._OLD_storage_cascade_prefix)
self._le_project.textChanged.connect(lambda newtext: self.projectChanged.emit(newtext))
self._le_prefix.editingFinished.connect(self._OLD_prepare_daq_folder)
self._le_project.editingFinished.connect(self._OLD_prepare_daq_folder)
self.increaseRunNumberRequest.connect(self._OLD_increase_run_number)
def _OLD_storage_cascade_prefix(self, val):
prefix = self._le_prefix.text()
if 0 == len(prefix):
_log.warning("empty prefix is not accepted")
self._le_prefix.setAccessibleName("invalid_input")
self._le_prefix.blockSignals(True)
self._le_prefix.setText("INVALID=>" + prefix)
QMessageBox.warning(self, "prefix is not valid", "Prefix is not valid!")
self._le_prefix.blockSignals(False)
return
else:
self._le_prefix.setAccessibleName("")
project = self._le_project.text()
folders.set_prefix(prefix)
folders.set_project(project)
folders.run = settings.value("run_number", type=int)
self._label_runnumber.setText(f"{folders.run:04d}")
self._data_folder = folders.raw_folder
self.folderChanged.emit(folders.raw_folder)
self._label_actual_prefix.setText(folders.prefix)
self._label_folder.setText(folders.raw_folder)
def _OLD_increase_run_number(self):
run = settings.value("run_number", type=int)
run += 1
settings.setValue("run_number", run)
folders.run = run
self._label_runnumber.setText(f"{run:04d}")
def _OLD_prepare_daq_folder(self):
global home, folders
prefix = folders.prefix
folder = folders.res_folder
if 0 == len(prefix):
return
try:
os.makedirs(folder, 0o750, exist_ok=True)
except:
msg = "Failed to create folder: {}".format(folder)
_log.warning(msg)
QMessageBox.warning(self, "Screenshot: failed to create folder", "Failed to create output folder for screenshot!\n\n\tScreenshot not taken!",)
raise
fname = os.path.join(folders.pgroup_folder, ".latest_raw")
try:
with open(fname, "w") as f:
f.write(folders.raw_folder)
_log.info("wrote: {}".format(fname))
except:
_log.warning("failed writing {}".format(fname))
fname = os.path.join(folders.pgroup_folder, ".latest_res")
try:
with open(fname, "w") as f:
f.write(folders.res_folder)
_log.info("wrote: {}".format(fname))
except:
_log.warning("failed writing {}".format(fname))
def _OLD_persist_setting(self, s, v):
app=QApplication.instance()
cfg=app._cfg
_log.debug("persisting {} = {}".format(s, v))
cfg.setValue(s, v)
def _OLD_method_changed(self, index):
method = self._tabs_daq_methods.currentWidget().accessibleName()
_log.info("method now => {}".format(method))
def _OLD_check_zescape(self):
msg = zescape.check()
if msg is None:
return
if "current" in msg:
_log.warning(f"current state: {self._esc_state}")
zescape.reply(self._esc_state)
elif "goto" in msg:
state = msg.split()[1].lower()
_log.warning(f"TELL requests to go to {state}")
try:
if "sampleexchange" in state:
_log.debug(
f"moving to mount with offset = {self._pin_mounting_offset}"
)
self.move_gonio_to_mount_position(offset=self._pin_mounting_offset)
elif "samplealignment" in state:
self.cb_esc_sample_alignment()
except:
zescape.reply("Maintenance")
zescape.reply(self._esc_state)
else: # JSON
data = json.loads(msg)
if "sampleName" in data:
_log.debug(f"TELL SAMPLE DATA => {data}")
self.tell2storage(data)
zescape.reply("ack")
elif "pin_offset" in data:
_log.debug(f"TELL pin offset => {data}")
self._pin_mounting_offset = data["pin_offset"]
zescape.reply("ack")
elif "get_pin_offset" in data:
_log.debug(f"TELL get pin offset => {data}")
zescape.reply_json({"pin_offset": self._pin_mounting_offset})
def _OLD_tell2storage(self, sample):
_log.debug(f"2 TELL SAMPLE DATA => {type(sample)}")
self._le_prefix.setText(sample["sampleName"])
self._le_project.setText(sample["sampleFolder"])
tstf = folders.get_prefixed_file("_newsample")
self.storage_cascade_prefix(None)
_log.warning(f"sample info: {tstf}")
def _OLD_is_task(self, task):
return task == self._active_task
def _OLD_get_task_menu(self):
pass
def _OLD_toggle_shutter(self, **kwargs):
if self._pv_shutter:
if 0 == self._pv_shutter.get():
self._pv_shutter.put(1)
self._button_shutter.setText("shutter opened\n\u2622")
else:
self._pv_shutter.put(0)
self._button_shutter.setText("shutter closed\n\u2622")
elif self._has_pulse_picker:
pulsePicker.toggle()
def _OLD_update_shutter_label(self, pvname, value, char_value, **kwargs):
if 0 == value:
self._button_shutter.setText("shutter closed")
else:
self._button_shutter.setText("shutter opened")
def _OLD_build_sample_selection_tab(self):
self._sample_selection = sample_selection.SampleSelection(self)
self._sample_selection.move_to_mount_position = (self.move_gonio_to_mount_position)
self._tab_sample_selection.setLayout(QVBoxLayout())
self._tab_sample_selection.layout().addWidget(self._sample_selection)
self._tab_sample_selection.layout().addStretch(2)
def _OLD_build_embl_group(self):
grp = QGroupBox("EMBL Acquisition")
layout = QFormLayout()
grp.setLayout(layout)
layout.addWidget(QLabel("Prefix"))
self._embl_prefix = QLineEdit("img")
layout.addWidget(self._embl_prefix)
def _OLD_abort_measurement(self, ev=None):
if settings.value(ACTIVATE_PULSE_PICKER):
pulsePicker.close()
jungfrau_detector.abort()
delta_tau.abort()
_log.debug("aborting measurement")
def _OLD_trigger_detector(self, **kwargs):
if self._pv_shutter is not None:
self._pv_shutter.put(0)
# self._eiger_button_collect.show()
# self._eiger_button_abort.hide()
# self._eiger_now_collecting_label.setText(
# "Finished sequence id: {}\n"
# "Data in: Data10/{}".format(
# self._detector_sequence_id, self._eiger_now_collecting_file
# )
# )
def _OLD_modify_camera_transform(self, t):
if t == "remove_all":
sample_camera.set_transformations([])
elif t == "undo_last":
sample_camera._transformations.pop()
#elif type(t) ==type(camera.Transforms): #ZAC: orig. code
# sample_camera.append_transform(t)
try:
label = ", ".join([t.name for t in sample_camera._transformations])
except:
label = ""
self._label_transforms.setText(label)
#settings.setValue(CAMERA_TRANSFORMATIONS, sample_camera._transformations) #ZAC: orig. code
def _OLD_roi_add_line(self):
roi = pg.LineSegmentROI(
[200, 200],
[300, 300],
pen="r",
scaleSnap=True,
translateSnap=True,
rotateSnap=True,
removable=True,
)
# roi.sigRegionChanged.connect(self.track_roi)
roi.sigRemoveRequested.connect(self.remove_roi)
self.vb.addItem(roi)
self._rois.append(roi)
def _OLD_roi_add_rect(self):
roi = pg.RectROI(
[200, 200],
[50, 50],
pen="r",
scaleSnap=True,
translateSnap=True,
rotateSnap=True,
removable=True,
)
roi.sigRegionChanged.connect(self.track_roi)
roi.sigRemoveRequested.connect(self.remove_roi)
self.vb.addItem(roi)
self._rois.append(roi)
def _OLD_remove_roi(self, roi):
self.vb.removeItem(roi)
self._rois.remove(roi)
def _OLD_prepare_microscope_page(self):
layout = self.microscope_page.layout()
container = QWidget()
hlay = QHBoxLayout()
container.setLayout(hlay)
layout.addWidget(container)
def _OLD_update_beam_marker(self, zoom_lvl):
w, h = settings.value(BEAM_SIZE)
try:
bx = self.beamx_fitter(zoom_lvl)
by = self.beamy_fitter(zoom_lvl)
ok = True
except:
ok = False
_log.warning("beam marker not defined")
return
_log.debug("updating beam mark to {:.1f}x{:.1f}".format(bx, by))
self.beamCameraCoordinatesChanged.emit(bx, by)
def _OLD_update_beam_marker_fitters(self):
if len(self._beam_markers) > 2:
_log.debug("defining beam marker")
bx = [(n, x[0]) for n, x in self._beam_markers.items()]
by = [(n, x[1]) for n, x in self._beam_markers.items()]
nbx = np.asarray(bx).T
nby = np.asarray(by).T
bx_coefs = np.polyfit(nbx[0], nbx[1], 3)
by_coefs = np.polyfit(nby[0], nby[1], 3)
_log.debug(".... beam marker X coeficients {}".format(bx_coefs))
_log.debug(".... beam marker Y coeficients {}".format(by_coefs))
self.beamx_fitter = np.poly1d(bx_coefs)
self.beamy_fitter = np.poly1d(by_coefs)
def _OLD_append_to_beam_markers(self, x, y, zoom):
self._beam_markers[zoom] = (x, y)
_log.info("beam markers {}".format(self._beam_markers))
settings.setValue(BEAM_MARKER_POSITIONS, self._beam_markers)
self.update_beam_marker_fitters()
def _OLD_remove_beam_markers(self):
self._beam_markers = {}
self.beamx_fitter = None
self.beamy_fitter = None
def _OLD_track_roi(self, roi):
x, y = roi.pos()
w, h = roi.size()
# area = roi.getArrayRegion(self._im, self.img)
# sum = np.sum(area)
# _log.info('{} => sum {}'.format((x,y), sum))
bx, by = x + w / 2., y + h / 2.
_log.info("beam pos {}".format((bx, by)))
_log.info("marker pos = {} ; size = {}".format((x, y), (w, h)))
def _OLD_toggle_mouse_tracking(self):
if self._mouse_tracking:
self.disengage_mouse_tracking()
else:
self.engage_mouse_tracking()
def _OLD_engage_mouse_tracking(self):
self.glw.scene().sigMouseMoved.connect(self.cb_mouse_move)
self.glw.scene().sigMouseMoved.emit()
self._mouse_tracking = True
def _OLD_disengage_mouse_tracking(self):
self.glw.scene().sigMouseMoved.disconnect(self.cb_mouse_move)
self._mouse_tracking = False
self._lb_coords.setText("")
def _OLD_get_beam_mark_on_camera_xy(self):
app=QApplication.instance()
z = app._zoom.get_val()
try:
bx = self.beamx_fitter(z)
by = self.beamy_fitter(z)
except:
bx, by = 500, 500
return (bx, by)
def _OLD_move_gonio_to_position(self, fx, fy, bx, bz, omega):
self.tweakers["fast_x"].motor.move(fx, wait=False, ignore_limits=True)
self.tweakers["fast_y"].motor.move(fy, wait=False, ignore_limits=True)
self.tweakers["base_x"].motor.move(bx, wait=False, ignore_limits=True)
self.tweakers["base_z"].motor.move(bz, wait=False, ignore_limits=True)
self.tweakers["omega"].motor.move(omega, wait=False, ignore_limits=True)
def _OLD_get_gonio_motors(self, as_json=False):
if as_json:
return {
"fast_x": self.tweakers["fast_x"].motor,
"fast_y": self.tweakers["fast_y"].motor,
"base_x": self.tweakers["base_x"].motor,
"base_z": self.tweakers["base_z"].motor,
"omega": self.tweakers["omega"].motor,
}
else:
return (
self.tweakers["fast_x"].motor,
self.tweakers["fast_y"].motor,
self.tweakers["base_x"].motor,
self.tweakers["base_z"].motor,
self.tweakers["omega"].motor,
)
def _OLD_get_gonio_tweakers(self):
return (
self.tweakers["fast_x"],
self.tweakers["fast_y"],
self.tweakers["base_x"],
self.tweakers["base_z"],
self.tweakers["omega"],
)
def _OLD_get_gonio_positions(self, as_json: bool = False):
fx, fy, cx, cz, omega = (
self.tweakers["fast_x"].motor,
self.tweakers["fast_y"].motor,
self.tweakers["base_x"].motor,
self.tweakers["base_z"].motor,
self.tweakers["omega"].motor,
)
a, b, c, d, e = (
fx.get_position(),
fy.get_position(),
cx.get_position(),
cz.get_position(),
omega.get_position(),
)
if as_json:
return {"fx": a, "fy": b, "bx": c, "bz": d, "omega": e}
else:
return (a, b, c, d, e)
def _OLD_escape_goToTellMountPosition(self):
self.move_gonio_to_mount_position()
self.lock_goniometer()
def _OLD_move_gonio_to_mount_position(self, offset: float = 0.0):
fx, fy, cx, cz, omega = self.get_gonio_motors()
bmark = "bookmark_0"
try:
t_fx = float(settings.value(bmark + "/mount_fx"))
t_fy = -offset + float(settings.value(bmark + "/mount_fy"))
t_cx = float(settings.value(bmark + "/mount_cx"))
t_cz = float(settings.value(bmark + "/mount_cz"))
t_omega = float(settings.value(bmark + "/mount_omega"))
except:
raise IncompleteConfiguration("TELL sample changer mount position is not configured!!!")
fx.move(t_fx, wait=True, ignore_limits=True)
fy.move(t_fy, wait=True, ignore_limits=True)
cx.move(t_cx, wait=True, ignore_limits=True)
cz.move(t_cz, wait=True, ignore_limits=True)
omega.move(t_omega, wait=True, ignore_limits=True)
app_utils.assert_motor_positions(
[
(fx, t_fx, 0.01),
(fy, t_fy, 0.01),
(cx, t_cx, 0.01),
(cz, t_cz, 0.01),
(omega, t_omega, 0.01),
]
)
self.cb_esc_sample_exchange()
def _OLD_lock_goniometer(self):
# tell.set_in_mount_position(True)
res = QMessageBox.question(self, "", "Mount a sample from console and click ok once the sample is mounted.", QMessageBox.Ok, QMessageBox.Ok,)
res = QMessageBox.question(self, "", "Is the sample is mounted?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No,)
if res in (QMessageBox.Yes, QMessageBox.No):
# tell.set_in_mount_position(False)
pass
@pyqtSlot(int)
def _OLD_saveBookmark(self, key: int):
"""save a bookmark for the corresponding key is the Qt.Key_* code """
fx, fy, cx, cz, omega = (
self.tweakers["fast_x"].motor,
self.tweakers["fast_y"].motor,
self.tweakers["base_x"].motor,
self.tweakers["base_z"].motor,
self.tweakers["omega"].motor,
)
bmark = "bookmark_{}".format(key)
if key == 0:
ans = QMessageBox.question(self, "Override TELL mount position", "This will overwrite the positions used for TELL MOUNTING!!!\n\n\tContinue ?",)
if ans != QMessageBox.Yes:
return
_log.info(
"saving bookmark {}: {}, {}, {}, {}, {}".format(
bmark,
fx.get_position(),
fy.get_position(),
cx.get_position(),
cz.get_position(),
omega.get_position(),
)
)
settings.setValue(bmark + "/mount_fx", fx.get_position())
settings.setValue(bmark + "/mount_fy", fy.get_position())
settings.setValue(bmark + "/mount_cx", cx.get_position())
settings.setValue(bmark + "/mount_cz", cz.get_position())
settings.setValue(bmark + "/mount_omega", omega.get_position())
def _OLD_gotoBookmark(self, key: int):
"""save a bookmark for the corresponding key"""
fx, fy, cx, cz, omega = (
self.tweakers["fast_x"].motor,
self.tweakers["fast_y"].motor,
self.tweakers["base_x"].motor,
self.tweakers["base_z"].motor,
self.tweakers["omega"].motor,
)
bmark = "bookmark_{}".format(key)
try:
t_fx = float(settings.value(bmark + "/mount_fx"))
t_fy = float(settings.value(bmark + "/mount_fy"))
t_cx = float(settings.value(bmark + "/mount_cx"))
t_cz = float(settings.value(bmark + "/mount_cz"))
t_omega = float(settings.value(bmark + "/mount_omega"))
except:
return
fx.move(t_fx, wait=True, ignore_limits=True)
fy.move(t_fy, wait=True, ignore_limits=True)
cx.move(t_cx, wait=True, ignore_limits=True)
cz.move(t_cz, wait=True, ignore_limits=True)
omega.move(t_omega, wait=True, ignore_limits=True)
def _OLD_move_cryojet_nozzle(self, pos):
cx = self.tweakers["cryo"]
if "in" == pos.lower():
key = CRYOJET_NOZZLE_IN
elif "out" == pos.lower():
key = CRYOJET_NOZZLE_OUT
to_pos = settings.value(key, 1e10, type=float)
if to_pos > 1e9:
raise IncompleteConfiguration(f"CRYOJET configuration is incomplete! Missing {key}")
cx.move_abs(to_pos, assert_position=True)
def _OLD_build_cryo_group(self, toolbox):
pfx=QApplication.instance()._cfg.value(AppCfg.GBL_DEV_PREFIX)[0]
qutilities.add_item_to_toolbox(
toolbox,
"Cryojet",
widget_list=[
self.get_tweaker(f"{pfx}:MOT_CRYO", alias="cryo", label="cryo X")
],
)
def _OLD_build_wegde_group(self, toolbox):
pfx=QApplication.instance()._cfg.value(AppCfg.GBL_DEV_PREFIX)[0]
qutilities.add_item_to_toolbox(toolbox,"Wedge Mover",
widget_list=[
self.get_tweaker(f"{pfx}:MOT_WEDGE1", alias="wedge_1", label="wedge_1"),
self.get_tweaker(f"{pfx}:MOT_WEDGE2", alias="wedge_2", label="wedge_2"),
self.get_tweaker(f"{pfx}:MOT_WEDGE3", alias="wedge_3", label="wedge_3"),
self.get_tweaker(f"{pfx}:MOT_WEDGE4", alias="wedge_4", label="wedge_4"),
self.get_tweaker(f"{pfx}:MOT_WEDGEX", alias="wedge_x", label="wedge_x"),
self.get_tweaker(f"{pfx}:MOT_WEDGEY", alias="wedge_y", label="wedge_y"),
self.get_tweaker(f"{pfx}:MOT_WEDGEA", alias="wedge_a", label="wedge_a"),
self.get_tweaker(f"{pfx}:MOT_WEDGEB", alias="wedge_b", label="wedge_b"),
],
)
def _OLD_build_slits_group(self, toolbox):
pfx=QApplication.instance()._cfg.value(AppCfg.GBL_DEV_PREFIX)[0]
qutilities.add_item_to_toolbox(
toolbox,
"Slits",
widget_list=[
self.get_tweaker(f"{pfx}10", alias="slit_right", label="left", mtype=1, ),
self.get_tweaker(f"{pfx}11", alias="slit_left", label="right", mtype=1,),
self.get_tweaker(f"{pfx}12", alias="slit_bottom", label="bottom", mtype=1,),
self.get_tweaker(f"{pfx}13",alias="slit_top",label="top",mtype=1,),
],
)
def _OLD_assert_post_tube_position(self, pos):
x_up = settings.value("post_sample_tube/x_up", 1e10, type=float)
y_up = settings.value("post_sample_tube/y_up", 1e10, type=float)
x_down = settings.value("post_sample_tube/x_down", 1e10, type=float)
y_down = settings.value("post_sample_tube/y_down", 1e10, type=float)
dx = settings.value("post_sample_tube/dx", 1e10, type=float)
dy = settings.value("post_sample_tube/dy", 1e10, type=float)
tz_in = settings.value("post_sample_tube/z_in", 1e10, type=float)
tz_out = settings.value("post_sample_tube/z_out", 1e10, type=float)
if (x_up + y_up + x_down + y_down + dx + dy + tz_in + tz_out) > 1e9:
raise Exception("miscofingured positions for post-sample tube")
usy = self.tweakers["tube_usy"]
dsy = self.tweakers["tube_dsy"]
usx = self.tweakers["tube_usx"]
dsx = self.tweakers["tube_dsx"]
tbz = self.tweakers["tube_z"]
if pos == "in":
yu = y_up
xu = x_up
yd = y_down
xd = x_down
z = tz_in
elif pos == "out":
yu = y_up + dy
xu = x_up + dx
yd = y_down + dy
xd = x_down + dx
z = tz_out
app_utils.assert_tweaker_positions([
(usy, yu, 0.1),
(dsy, yd, 0.1),
(usx, xu, 0.1),
(dsx, xd, 0.1),
(tbz, z, 0.1),],timeout=2.0,
)
def _OLD_add_tweaker(self, pv, alias=None, label=None, mtype="epics_motor", layout=None):
if layout is None:
layout = self._tweak_container.layout()
if mtype == "epics_motor":
m = MotorTweak()
else:
m = SmaractMotorTweak()
layout.addWidget(m)
m.connect_motor(pv, label)
self.tweakers[alias] = m
def _OLD_done_sliding(self):
print("done sliding at {}".format(self.slider_fast_x.value()))
def _OLD_daq_grid_add_grid(self, gx=None, gy=None):
grid_index = len(self._grids)
if gx in (False, None):
gx=self.tweakers["fast_x"].get_rbv()
gy=self.tweakers["fast_y"].get_rbv()
xstep = self._sb_grid_x_step.value()
ystep = self._sb_grid_y_step.value()
xoffset = self._sb_grid_x_offset.value()
yoffset = self._sb_grid_y_offset.value()
app=QApplication.instance()
geo=app._geometry
oc=geo._opt_ctr
if xstep==0:
go=UsrGO.Grid((120, -100), (200, 150), (30, 22), 2)
elif xstep==1:
go=UsrGO.FixTargetFrame((120, -100), (200, 150), tpl='test')
elif xstep==2:
v=geo.pos2pix((12.5, 0))
l=np.linalg.norm(v)
go=UsrGO.FixTargetFrame(-oc, (l,l), tpl='12.5x12.5')
elif xstep==3:
v=geo.pos2pix((23, 0))
l=np.linalg.norm(v)
go=UsrGO.FixTargetFrame(-oc, (l,l), tpl='23.0x23.0')
else:
_log.error('set xstep 0..2 for tests')
self.vb.addItem(go)
self._goTracked['objLst'].append(go)
#grid = CstROI.Grid( x_step=xstep, y_step=ystep, x_offset=xoffset, y_offset=yoffset, gonio_xy=(gx, gy), grid_index=grid_index, parent=self,)
#self.vb.addItem(grid)
#grid.calculate_gonio_xy()
#grid.sigRemoveRequested.connect(lambda g=grid: self.remove_grid(g))
def _OLD_daq_grid_remove_all(self):
vb=self.vb
for go in self._goTracked['objLst']:
vb.removeItem(go)
self._goTracked['objLst']=[]
def _OLD_grids_pause_stage_tracking(self):
for grid in self._grids:
grid.disable_stage_tracking()
def _OLD_grids_start_stage_tracking(self):
for grid in self._grids:
grid.enable_stage_tracking()
def _OLD_remove_grid(self, grid):
self.vb.removeItem(grid)
self._grids.remove(grid)
def _OLD_daq_grid_update(self, grid_index):
try:
grid = self._grids[grid_index]
except:
print("grid index not yet there")
return
points = grid.get_grid_targets()
num_points = len(points)
etime = float(settings.value("exposureTime"))
doc = f"grid_{grid_index} = ["
for n, pos in enumerate(points):
x, y = pos
doc += "[{:8.3f}, {:8.3f}],\n".format(x, y)
doc += "]"
self._grid_inspect_area.setPlainText(doc)
m = "Number of points: {}\nEstimated Time: {:.1f} minutes".format(num_points, num_points * etime / 60.)
self._label_grid_parameters.setText(m)
def _OLD_daq_embl_collect_points(self):
coords = self._embl_module.coords
points = [[x, y] for x, y, bx, bz, o in coords]
points = np.array(points)
method = "trajectory"
xp = (1000 * points).astype(int) # microns then round int
params = (xp[:, 0].tolist(), xp[:, 1].tolist())
self.daq_collect_points(points, visualizer_method=method, visualizer_params=params)
def _OLD_daq_prelocated_collect_points(self):
points = []
data = self.prelocationModule.get_collection_targets()
for n, cc in enumerate(data):
is_fiducial, gx, gy, cx, cy, ox, oy = cc
points.append([gx, gy])
points = np.array(points)
method = "trajectory"
xp = (1000 * points).astype(int) # microns then round int
params = (xp[:, 0].tolist(), xp[:, 1].tolist())
self.daq_collect_points(points, visualizer_method=method, visualizer_params=params)
def _OLD_daq_grid_findxtals(self):
feature_size = self._sb_findxtals_feature_size.value()
image = sample_camera.get_image()
findObj(-image, objSize=feature_size, viz=1)
def _OLD_check_jungfrau_save(self) -> bool:
if jungfrau_detector.is_running_detector():
saveRaw = jungfrau_detector.is_saving_data()
if not saveRaw:
box = QMessageBox()
box.setText("Jungfrau save data disabled!")
box.setInformativeText("Jungfrau save data is disabled!")
box.setIcon(QMessageBox.Warning)
box.setDetailedText("Choose to abort, enable and continue, or continue without saving raw data")
btContinue = box.addButton("Continue", QMessageBox.YesRole)
btAbort = box.addButton("OMG! Abort", QMessageBox.NoRole)
btEnable = box.addButton("Enable save and continue", QMessageBox.YesRole)
box.exec_()
ans = box.clickedButton()
if ans == btEnable:
jungfrau_detector.set_save_raw(True)
return True
elif ans == btAbort:
_log.info("not doing helical scan")
return False
return True
return True
def _OLD_daq_collect_points(self, points, visualizer_method, visualizer_params):
app = QApplication.instance()
cfg = app._cfg
verbose=0xff
fn='/tmp/shapepath'
try:
dt=app._deltatau
except AttributeError:
app._deltatau=dt=deltatau.Deltatau()
try:
jf=app._jungfrau
except AttributeError:
app._jungfrau=jf=detector.Jungfrau()
sp=dt._shapepath
sp.gen_grid_points(w=15, h=15, pitch=3, rnd=0, ofs=(0, +2000))
#sp.gen_grid_points(w=5, h=10, pitch=1, rnd=0, ofs=(0, 0));sp.sort_points(False, 10);sp.points
sp.sort_points(False, 15);
sp.meta['pt2pt_time']=10
sp.setup_gather()
sp.setup_sync(verbose=verbose&32, timeOfs=0.05)
sp.setup_coord_trf() # reset to shape path system
# sp.meta['pt2pt_time']=10 #put between setup_sync and setup_motion to have more motion points than FEL syncs
sp.setup_motion(fnPrg=fn+'.prg', mode=3, scale=1., dwell=10)
sp.homing() # homing if needed
sp.run() # start motion program
sp.wait_armed() # wait until motors are at first position
sp.trigger(0.5) # send a start trigger (if needed) ater given time
if not dt._comm is None:
while True:
p=int(sp.progress())
if p<0: break
#print('progress %d/%d'%(p, num_pts))
time.sleep(.1)
sp.gather_upload(fnRec=fn+'.npz')
#dp=deltatau.shapepath.DebugPlot(sp)
#dp.plot_gather(mode=11)
#plt.show(block=False)
#plt.show(block=True)
return
task = self.active_task()
XDIR = -1
#folders.make_if_needed()
#if ( cfg.option(AppCfg.ACTIVATE_PULSE_PICKER) and not jungfrau_detector.is_running_detector()):
# if QMessageBox.No == QMessageBox.question(self, "X-rays but no Jungfrau",
# "X-rays will be used bu the Jungfrau will not run.\n\n\tContinue?",):
# _log.warning("user forgot to turn on the jungfrau")
# return
#if option(ACTIVATE_PULSE_PICKER) or not option(SKIP_ESCAPE_TRANSITIONS_IF_SAFE):
# self.escape_goToDataCollection()
points *= 1000 # delta tau uses micrometers
points[:, 0] *= XDIR # fast X axis is reversed
# sync_mode : default=2
# 0 : no sync at all
# 1 : synchronize start
# 2 : synchronize start and adapt motion speed
# this function generates the code blocks:
# self.sync_wait and self.sync_run
# sync_wait can be put in the program to force a timing sync
# sync_run are the commands to run the whole program
# sync_flag if not using jungfrau =1 otherwise =0
# D.O. shapepath.meta.update(sync_mode=2, sync_flag=1)
sp.meta.update(sync_mode=0, sync_flag=0)
maxacq_points = 116508
samp_time = 0.0002 # s
overhead_time = 0.1
etime=10
vscale=1.0
#etime = settings.value("exposureTime", type=float)
#vscale = settings.value(DELTATAU_VELOCITY_SCALE, 1.0, type=float)
#sort_points = option(DELTATAU_SORT_POINTS)
acq_per = int(np.ceil((etime * len(points) + overhead_time) / (maxacq_points * samp_time)))
_log.info(f"gather acquisotion period = {acq_per}")
_log.info(f"velocity scale {vscale}")
sp.setup_gather(acq_per=acq_per)
sp.setup_sync(verbose=True)
sp.setup_coord_trf()
assert(points.dtcfgype==np.float64)
sp.points = points
if TASK_GRID == task:
# width, height = visualizer_params
# _log.debug(f"grid: {width} x {height}")
# details_1 = [width]
# details_2 = [height]
# sp.sort_points(xy=False, grp_sz=height)
pass
elif task in (TASK_PRELOCATED, TASK_EMBL):
if sort_points:
shapepath.sort_points()
self.daq_method_prelocated_remove_markers()
details_1, details_2 = visualizer_params
sp.setup_motion(
mode=3, # 1 = bad pvt 3 = pft (pvt via inverse fourier transform)
pt2pt_time=etime * 1000.,
#fnPrg=folders.get_prefixed_file("_program.prg"),
scale=vscale, # velocity at target position scaling: 1=optimal speed, 0=zero speed
dwell=10, # milli-seconds wait
)
sp.run()
self.qprogress = QProgressDialog(self)
self.qprogress.setRange(0, 0)
self.qprogress.setLabelText("Acquiring GRID")
self.qprogress.setCancelButtonText("Abort Acquisition")
self.qprogress.canceled.connect(self.complete_daq)
self.qprogress.setWindowModality(Qt.WindowModal)
self.qprogress.setAutoClose(True)
self.qprogress.show()
sequencer_steps = [lambda: self.grids_pause_stage_tracking()]
if jungfrau_detector.is_running_detector():
if not self.check_jungfrau_save():
# user aborted run from save data dialog
return
n_frames = ntrigger
uid = settings.value(EXPERIMENT_UID, type=int)
backend_extras = self.jungfrau.get_parameters()
backend_extras.update(
{
"swissmx_trajectory_method": visualizer_method,
"swissmx_trajectory_details_1": details_1,
"swissmx_trajectory_details_2": details_2,
}
)
jungfrau_detector.set_number_of_frames(n_frames)
jungfrau_detector.set_data_owner_uid(uid)
sequencer_steps.extend(
[
lambda: jungfrau_detector.configure(
n_frames=n_frames,
outfile=folders.prefix,
outdir=folders.raw_folder,
uid=uid,
backend_extras=backend_extras,
),
lambda: jungfrau_detector.arm(),
]
)
sequencer_steps.append(lambda: shapepath.wait_armed())
if option(ACTIVATE_PULSE_PICKER):
sequencer_steps.append(lambda: pulsePicker.open())
# if settings.value("scanning/trigger_microscope_camera", type=bool):
# sample_camera.switch_to_trigger(True)
sequencer_steps.append(lambda: shapepath.trigger(wait=0.5))
def _OLD_shapepath_progress():
while True:
p = shapepath.progress()
if p < 0:
break
time.sleep(0.1)
self.qprogress.setLabelText(f"Acquiring GRID {p:.0f} / {ntrigger}")
_log.warning(f"motion complete!")
# sample_camera.stop_camera()
# sample_camera.switch_to_trigger(False)
# sample_camera.save_buffer_series(folders.prefix)
sequencer_steps.append(shapepath_progress)
if option(ACTIVATE_PULSE_PICKER):
sequencer_steps.append(lambda: pulsePicker.close())
sequencer_steps.append(lambda: jungfrau_detector.wait_finished())
sequencer_steps.append(lambda: self.grids_start_stage_tracking())
self.sequencer = Sequencer(steps=sequencer_steps)
self._thread = QThread()
self._thread.setObjectName("acquisition_thread")
self.sequencer.moveToThread(self._thread)
self.sequencer.finished.connect(self.daq_collect_points_wrapup)
self._thread.started.connect(self.sequencer.run_sequence)
self._thread.start()
def _OLD_run_steps(self, steps, title, at_end=None, cancelable=False):
dlg = QProgressDialog(self)
dlg.setWindowTitle(title)
dlg.setWindowModality(Qt.WindowModal)
dlg.setMinimumDuration(0)
if not cancelable:
dlg.setCancelButton(None)
dlg.setRange(0, 0)
dlg.setLabelText(f"<b>{title}</b><br/>")
dlg.setAutoClose(True)
dlg.show()
dlg.setValue(random.randint(1, 20))
class Runner(QObject):
finito = pyqtSignal()
timeoutExpired = pyqtSignal()
errorHappened = pyqtSignal(str)
result = pyqtSignal(str)
def __init__(self, step_to_run):
super().__init__()
self.step = step_to_run
self.exception = None
self.done = False
def run(self):
try:
self.step()
except Exception as e:
_log.debug(" +> step exception")
self.exception = str(e)
self.errorHappened.emit(str(e))
self.finito.emit()
for n, step in enumerate(steps):
_log.info(f"running step {step.title}")
dlg.setLabelText(f"<b>{title}</b><br/><font color=\"blue\">{step.title}</font>")
dlg.setValue(n)
thread = QThread()
runner = Runner(step)
runner.moveToThread(thread)
thread.started.connect(runner.run)
runner.finito.connect(thread.quit)
thread.start()
while thread.isRunning():
dlg.setValue(random.randint(1, 20))
time.sleep(0.01)
if dlg.wasCanceled():
# FIXME ensure we abort the running thread
break
if dlg.wasCanceled():
break
if runner.exception is not None:
QMessageBox.critical(self, step.title, str(runner.exception))
break
if dlg.wasCanceled():
_log.error(f"sequence {title} was cancelled by user")
raise AcquisitionAbortedException(f"sequence {title} was cancelled by user")
if at_end is not None:
at_end()
dlg.reset()
def _OLD_daq_collect_points_wrapup(self):
self.qprogress.reset()
if self._thread.isRunning():
self._thread.quit()
shapepath.gather_upload(os.path.join(folders.res_folder, folders.prefix + ".npz"))
if option(DELTATAU_SHOW_PLOTS):
dp = DebugPlot(shapepath)
dp.plot_gather(mode=1)
pyplot.show()
if TASK_PRELOCATED == self.active_task():
self.daq_method_prelocated_update_markers()
if option(ACTIVATE_PULSE_PICKER) or not option(SKIP_ESCAPE_TRANSITIONS_IF_SAFE):
self.cb_esc_sample_alignment()
sequence = {"delta tau program": shapepath.prg, "points": shapepath.points.tolist(), "timestamp": tdstamp(),}
sfname = folders.get_prefixed_file("_ScanInfo.json")
try:
with open(sfname, "w") as sf:
_log.info("writing scan info into: {}".format(sfname))
sf.write(json.dumps(sequence))
except:
_log.warning(f"failed to write scan info to {sfname}")
self.re_connect_collect_button()
jungfrau_detector.abort()
self.increaseRunNumberRequest.emit()
def _OLD_daq_collect_update_inspect(self, msg):
te = self._inspect
m = te.toPlainText()
te.setPlainText(m + msg + "\n")
te.verticalScrollBar().setValue(te.verticalScrollBar().maximum())
def _OLD_daq_helical_collect(self):
"""[
[{
0: (0.0, 1.238401694734829, 3.527, 0.936, 0.001),
120: (0.0, 1.3890393596274455, -1.957242824091597, -0.5639999999999998, 120.001),
240: (0.0, 1.3866130349657206, -1.1642619618562273, 3.105, 240.001)
},
{
0: (0.0, 1.238401694734829, 3.527, 0.936, 0.001),
120: (0.0, 1.3890393596274455, -1.957242824091597, -0.5639999999999998, 120.001),
240: (0.0, 1.3866130349657206, -1.1642619618562273, 3.105, 240.001)
}]
]
"""
_log.info("executing collection")
htab = self._helical_scan_table
num_h = htab.scanHorizontalCount()
num_v = htab.scanVerticalCount()
if ( settings.value(ACTIVATE_PULSE_PICKER) and not jungfrau_detector.is_running_detector()):
if QMessageBox.No == QMessageBox.question(self, "X-rays but no Jungfrau",
"X-rays will be used bu the Jungfrau will not run.\n\n\tContinue?",):
_log.warning("user forgot to turn on the jungfrau")
return
if option(ACTIVATE_PULSE_PICKER) or not option(SKIP_ESCAPE_TRANSITIONS_IF_SAFE):
self.cb_esc_data_collection()
folders.make_if_needed()
data = htab.get_data()
_log.debug(data)
start, end = data[0]
FX, FY, BX, BZ, O = range(5)
x = (
(-1000 * start[0][BX], -1000 * start[120][BX], -1000 * start[240][BX]),
(-1000 * end[0][BX], -1000 * end[120][BX], -1000 * end[240][BX]),
)
y = (1000 * start[0][FY], 1000 * end[0][FY])
z = (
(-1000 * start[0][BZ], -1000 * start[120][BZ], -1000 * start[240][BZ]),
(-1000 * end[0][BZ], -1000 * end[120][BZ], -1000 * end[240][BZ]),
)
if jungfrau_detector.is_running_detector():
if not self.check_jungfrau_save():
return
saveRaw = jungfrau_detector.is_saving_data()
_log.debug(f"x = {x}")
_log.debug(f"y = {y}")
_log.debug(f"z = {z}")
oscillationAngle = settings.value("oscillationAngle", type=float)
exposureTime = 1000 * settings.value("exposureTime", type=float)
blastRadius = settings.value("blastRadius", type=float)
totalRange = num_v * num_h * oscillationAngle
# sync_mode : default=2
# 0 : no sync at all
# 1 : synchronize start
# 2 : synchronize start and adapt motion speed
# this function generates the code blocks:
# self.sync_wait and self.sync_run
# sync_wait can be put in the program to force a timing sync
# sync_run are the commands to run the whole program
# sync_flag if not using jungfrau =1 otherwise =0
# D.O. helical.meta.update(sync_mode=2, sync_flag=1)
helical.meta.update(sync_mode=0, sync_flag=0)
helical.calcParam(x=x, y=y, z=z)
helical.setup_gather()
helical.setup_sync()
helical.setup_coord_trf()
mode = 1
hRng = (-blastRadius * num_h, blastRadius * num_h)
w_start = 1000 * htab.startAngle()
wRng = (w_start, w_start + (totalRange * 1000))
_log.info(
f"helical params mode={mode}, cnthor={num_h}, cntvert={num_v}, hrng={hRng}, wrng={wRng}"
)
helical.setup_motion(
mode=mode,
cntHor=num_h,
cntVert=num_v,
hRng=hRng,
wRng=wRng,
pt2pt_time=exposureTime,
) # hRng in micrometers
helical.run()
try:
with open(folders.get_prefixed_file("_helical_debug.cmd"), "w") as fd:
fd.write("calcParam(x={}, y={}, z={})".format(x, y, z))
except:
pass
self.qprogress = QProgressDialog(self)
self.qprogress.setRange(0, 0)
self.qprogress.setLabelText("Acquiring HELICAL")
self.qprogress.setCancelButtonText("Abort Acquisition")
self.qprogress.canceled.connect(self.complete_daq)
self.qprogress.setWindowModality(Qt.WindowModal)
self.qprogress.setAutoClose(True)
self.qprogress.show()
sequencer_steps = [lambda: self.grids_pause_stage_tracking()]
n_frames = num_h * num_v
if jungfrau_detector.is_running_detector():
uid = settings.value(EXPERIMENT_UID, type=int)
backend_extras = self.jungfrau.get_parameters()
backend_extras.update(
{
"swissmx_trajectory_method": "grid",
"swissmx_trajectory_details_1": [-num_h],
"swissmx_trajectory_details_2": [num_v],
}
)
jungfrau_detector.set_number_of_frames(n_frames)
jungfrau_detector.set_data_owner_uid(uid)
sequencer_steps.extend(
[
lambda: jungfrau_detector.configure(
n_frames=n_frames,
outfile=folders.prefix,
outdir=folders.raw_folder,
uid=uid,
backend_extras=backend_extras,
),
lambda: jungfrau_detector.arm(),
]
)
sequencer_steps.append(lambda: helical.wait_armed())
if settings.value(ACTIVATE_PULSE_PICKER):
sequencer_steps.extend([lambda: pulsePicker.open(), lambda: pend_event(0.1)])
sequencer_steps.append(lambda: helical.trigger())
def _OLD_motion_progress():
while True:
p = helical.progress()
if p < 0:
break
time.sleep(0.1)
self.qprogress.setLabelText(f"Acquiring HELICAL {p:.0f} / {n_frames}")
_log.warning(f"helical motion complete!")
sequencer_steps.append(motion_progress)
if settings.value(ACTIVATE_PULSE_PICKER):
sequencer_steps.append(lambda: pulsePicker.close())
sequencer_steps.append(lambda: self.grids_start_stage_tracking())
self.sequencer = Sequencer(steps=sequencer_steps)
self._thread = QThread()
self._thread.setObjectName("acquisition_thread")
self.sequencer.moveToThread(self._thread)
self.sequencer.finished.connect(self.daq_helical_collect_wrapup)
self._thread.started.connect(self.sequencer.run_sequence)
self._thread.start()
def _OLD_daq_helical_collect_wrapup(self):
try:
self.qprogress.reset()
except:
pass
if self._thread.isRunning():
self._thread.quit()
helical.gather_upload(os.path.join(folders.res_folder, folders.prefix + ".npz"))
self.re_connect_collect_button()
jungfrau_detector.abort()
if option(ACTIVATE_PULSE_PICKER) or not option(SKIP_ESCAPE_TRANSITIONS_IF_SAFE):
self.cb_esc_sample_alignment()
self.increaseRunNumberRequest.emit()
if option(DELTATAU_SHOW_PLOTS):
hsgui = HelicalScanGui(helical)
hsgui.interactive_anim()
def _OLD_complete_daq(self):
_log.info("daq completed: cleaning up")
try:
self.qprogress.reset()
except:
pass
try:
if self._thread.isRunning():
self._thread.quit()
except:
pass
finally:
self.abort_measurement()
self.re_connect_collect_button()
def _OLD_re_connect_collect_button(self, callback=None, **kwargs):
_log.debug("re_connect_collect_button() {} => {}".format(callback, kwargs))
return
# button = self._button_collect
# if callback is None:
# callback = self.execute_collection
# button.setAccessibleName("collect_button")
# kwargs["label"] = "Collect"
#
# try:
# button.disconnect()
# finally:
# button.clicked.connect(callback)
#
# if "accessibleName" in kwargs:
# button.setAccessibleName(kwargs["accessibleName"])
#
# if "label" in kwargs:
# button.setText(kwargs["label"])
# self.load_stylesheet()
def _OLD_collect_abort_grid(self):
self._is_aborted = True
try:
self._eigerwaitthread._is_aborted = True
except:
pass
_log.warning("aborting grid scan")
self.abort_measurement()
delta_tau.abort()
jungfrau_detector.disarm()
self.re_connect_collect_button()
def _OLD_create_escape_toolbar(self):
cont = self._wd_right
w = QGroupBox("Escape")
layout = QHBoxLayout()
w.setLayout(layout)
but = QPushButton("Exchange\nSample")
but.setAccessibleName("escape_button_se")
but.setObjectName("action_SampleExchange")
but.clicked.connect(self.cb_esc_sample_exchange)
layout.addWidget(but)
but = QPushButton("Alignment")
but.setAccessibleName("escape_button_sa")
but.clicked.connect(self.cb_esc_sample_alignment)
layout.addWidget(but)
but = QPushButton("Collection")
but.setAccessibleName("escape_button_dc")
but.clicked.connect(self.cb_esc_data_collection)
layout.addWidget(but)
cont.layout().addWidget(w)
def _OLD_daq_method_prelocated_remove_markers(self):
try:
for m in self._marker_rois:
m.disconnect_signals()
self.vb.removeItem(m)
except Exception as e:
_log.warning("maybe failed removing markers: {}".format(e))
self._marker_rois = []
def _OLD_pause_all_markers(self):
for m in self._marker_rois:
m.disconnect_signals()
def _OLD_resume_all_markers(self):
for m in self._marker_rois:
m.reconnect_signals()
def _OLD_daq_method_prelocated_update_markers(self):
self.daq_method_prelocated_remove_markers()
data = self.prelocationModule.get_data()
add_xtals = self.prelocationModule._xtals_transformed
draw_xtals = self.prelocationModule.set_draw_crystal_marks
vb = self.vb
self._marker_rois = []
ppm = self.getPpm()
for n, cc in enumerate(data):
is_fiducial, gx, gy, cx, cy, ox, oy = cc
if not is_fiducial:
if not (add_xtals and draw_xtals):
continue
_log.debug(f"adding {'fiducial' if is_fiducial else 'xtal'} mark #{n}: {is_fiducial} {gx:.3f}, {gy:.3f}, {cx:.1f}, {cy:.1f}")
m = CstROI.CrystalCircle(
gonio_xy=(gx, gy),
parent=self,
model_row_index=n,
is_fiducial=is_fiducial,
ppm=ppm,
)
# m.sigRegionChangeFinished.connect(lambda roi=m: self.daq_method_prelocated_update_model(roi))
self._marker_rois.append(m)
vb.addItem(m)
for c in self._marker_rois:
c.reconnect_signals()
c.follow_stage()
def _OLD_daq_method_prelocated_set_fiducial(self, camx, camy, gx, gy):
_log.debug(f"camx, camy: {camx}, {camy}, fx, fy: {gx, gy}")
self.prelocationModule.set_fiducial_coords(camx, camy, gx, gy)
def _OLD_daq_method_prelocated_append_data(self, x, y, gx, gy):
_log.debug("appending to model: {} {}".format((x, y), (gx, gy)))
self.prelocationModule.append_data((x, y, gx, gy))
self.daq_method_prelocated_update_markers()
def _OLD_daq_method_prelocated_update_model(self, roi):
row = roi.get_model_row()
pos = roi.pos()
self.prelocationModule.set_data_camera(row, pos)
_log.debug("updating row {} => {}".format(row, pos))
def _OLD_daq_method_prelocated_add_reference(self):
self._references.append(pg.CircleROI())
def _OLD_move_fast_stage(self, x, y):
_log.info(f"received request to move gonio to x, y = {x:.3f}, {y:.3f} mm")
fx_motor, fy_motor, bx_motor, bz_motor, omega_motor = self.get_gonio_tweakers()
fx_motor.move_abs(x)
fy_motor.move_abs(y)
def _OLD_toggle_maximized(self):
if self.isMaximized():
self.showNormal()
else:
self.showMaximized()
def _OLD_show_window_configuration(self):
_log.debug("maximized? {}".format(self.isMaximized()))
def _OLD_update_user_and_storage(self):
global folders
pgroup = settings.value(EXPERIMENT_PGROUP, "not_set_yet")
diag = GenericDialog( title="P-group for experiment", message="Enter the p-group to be used",
inputs={EXPERIMENT_PGROUP: ("P-group", pgroup, QLineEdit(pgroup))}, use_buttons=False,)
diag.setModal(True)
run_user = getpass.getuser()
pgrp_re = re.compile(r"^p(?P<uid>\d{5})$")
if not self.is_recently_active():
diag.exec()
results = diag.results
for k, v in results.items():
m = pgrp_re.match(v)
if m:
settings.setValue(k, v)
settings.setValue(EXPERIMENT_UID, int(m["uid"]))
else:
QMessageBox.critical(self, "wrong P-group format", "P-groups are in the format:\n\n\t\tp?????\n\n\twhere ? = digit",)
self.update_user_and_storage()
return
settings.sync()
folders.set_pgroup(settings.value(EXPERIMENT_PGROUP))
try:
folders.check_permissons()
except:
folder = folders.pgroup_folder
pgroup = settings.value(EXPERIMENT_PGROUP)
box = QMessageBox()
box.setText("No Write Permission!")
box.setInformativeText("User {} has no write access to p-group {} folder:\n{}\n".format(run_user, pgroup, folder))
box.setIcon(QMessageBox.Warning)
box.setDetailedText("The folder /sf/bernina/data/{pgroup}/res/ has to writable by the user {user}, currently running the SwissMX app.".format(pgroup=pgroup, user=run_user))
btIgnore = box.addButton("Ignore", QMessageBox.NoRole)
btRetry = box.addButton("Le'me try again", QMessageBox.YesRole)
box.exec_()
ans = box.clickedButton()
if ans == btRetry:
self.update_user_and_storage()
elif ans == btIgnore:
_log.warning("no write access to pgroup but user didn't care")
self._label_pgroup.setText(settings.value(EXPERIMENT_PGROUP))
def _OLD_is_recently_active(self):
last_active = settings.value("last_active", 0, type=float)
minutes_since_last = int((time.time() - last_active) / 60.0)
return minutes_since_last < 60
def _OLD_openPreferencesDialog(self):
PreferencesDialog(self).exec_()
def _OLD_set_app_icon(self):
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QtGui.QIcon(os.path.join(scriptDir + os.path.sep + "logo.png")))
def _OLD_set_widget_background_color(self, color):
"""change a widget's color
:param color:
:return:
"""
try:
color = QtGui.QColor.colorNames().index(color)
except:
return
p = self.palette()
p.setColor(self.backgroundRole(), color)
self.setPalette(p)
# functions are prefixed with _OLD_
def _OLD_build_daq_methods_grid_tab(self):
self._grids = []
tab = self._tab_daq_method_grid
layout = tab.layout() # gridlayout
self._sb_grid_x_step.setValue(30)
self._sb_grid_y_step.setValue(30)
self._bt_add_grid.clicked.connect(self._OLD_daq_grid_add_grid)
self._bt_remove_all_grids.clicked.connect(self._OLD_daq_grid_remove_all)
self._find_targets_from_microscope_image.clicked.connect(self._OLD_daq_grid_findxtals)
self.addGridRequest.connect(self._OLD_daq_grid_add_grid)
self.gridUpdated.connect(self._OLD_daq_grid_update)
def _OLD_build_daq_methods_prelocated_tab(self):
tab = self._tab_daq_method_prelocated
self.prelocationModule = PrelocatedCoordinatesModel.PrelocatedCoordinates(parent=self)
tab.layout().addWidget(self.prelocationModule)
self.prelocationModule.prefixSelected.connect(lambda prefix: self._le_prefix.setText(prefix))
self.prelocationModule.dataFileLoaded.connect(self._OLD_daq_method_prelocated_update_markers)
self.prelocationModule.prelocatedDataUpdated.connect(self._OLD_daq_method_prelocated_update_markers)
self.prelocationModule.markersDeleted.connect(self._OLD_daq_method_prelocated_remove_markers)
self.fiducialPositionSelected.connect(self._OLD_daq_method_prelocated_set_fiducial)
self.appendPrelocatedPosition.connect(self._OLD_daq_method_prelocated_append_data)
self.prelocationModule.moveFastStageRequest.connect(self._OLD_move_fast_stage)
self._preloc_inspect_area = QPlainTextEdit()
tab.layout().addWidget(self._preloc_inspect_area)
def _OLD_build_daq_methods_embl_tab(self):
app = QApplication.instance()
self._tab_daq_method_embl.setLayout(QVBoxLayout())
layout = self._tab_daq_method_embl.layout()
#motors = self.get_gonio_motors()
self._embl_module = EmblWidget(self) #ZAC: orig. code
#self._embl_module.configure(motors, app._camera, app._zoom)
layout.addWidget(self._embl_module)
def _OLD_create_helical_widgets(self):
tbox = self._helical_tablebox
htab = self._helical_scan_table = HelicalTableWidget()
htab.gonioMoveRequest.connect(self._OLD_move_gonio_to_position)
tbox.setLayout(QVBoxLayout())
grp = QWidget()
grp.setLayout(QFormLayout())
le = QSpinBox()
le.setRange(1, 100)
le.setValue(htab.scanHorizontalCount())
le.valueChanged.connect(lambda cnt: htab.setScanHorizontalCount(cnt))
grp.layout().addRow("Horizontal Count", le)
le = QSpinBox()
le.setRange(1, 100)
le.setValue(htab.scanVerticalCount())
le.valueChanged.connect(lambda cnt: htab.setScanVerticalCount(cnt))
grp.layout().addRow("Vertical Count", le)
le = QDoubleSpinBox()
le.setRange(-180.0, 180.0)
le.setSingleStep(5.0)
le.setSuffix(" degrees")
le.valueChanged.connect(htab.setStartAngle)
grp.layout().addRow("Start angle", le)
tbox.layout().addWidget(grp)
widgi = QWidget()
widgi.setLayout(QHBoxLayout())
tbox.layout().addWidget(widgi)
but = QPushButton("Add Crystal")
but.clicked.connect(htab.add_xtal)
widgi.layout().addWidget(but)
but = QPushButton("Set START")
but.clicked.connect(lambda: htab.set_xtal_start(self.get_gonio_positions()))
widgi.layout().addWidget(but)
but = QPushButton("Set END")
but.clicked.connect(lambda: htab.set_xtal_end(self.get_gonio_positions()))
widgi.layout().addWidget(but)
tbox.layout().addWidget(htab)
def _OLD_add_beam_marker(self):
app = QApplication.instance()
cfg = app._cfg
w, h = cfg.value(AppCfg.GEO_BEAM_SZ)
self._beammark = bm = CstROI.BeamMark([100, 100], (int(w), int(h)), parent=self)
tr=QtGui.QTransform() # prepare ImageItem transformation:
tr.rotate(30)
bm.setTransform(tr) # assign transform
self.vb.addItem(self._beammark)
bm=UsrGO.BeamMark([50, 120], [30, 20])
self.vb.addItem(bm)
vi=UsrGO.Grid((120, -100), (200, 150), (30, 20), 2)
self.vb.addItem(vi)
def _OLD_camera_pause_toggle(self):
app=QApplication.instance()
app._camera.pause()
def _OLD_init_settings_tracker(self):
app=QApplication.instance()
cfg=app._cfg
_log.info("configuring widget persistence")
fields = {
# 'folder': (self._label_folder, str),
"project": (self._le_project, str),
"prefix": (self._le_prefix, str),
"actual_prefix": (self._label_actual_prefix, str),
"exposureTime": (self._dsb_exposure_time, float),
"oscillationAngle": (self._dsb_oscillation_step, float),
"blastRadius": (self._dsb_blast_radius, float),
}
for key, f_config in fields.items():
widget, conv = f_config
value = cfg.value(key)
try:
wset, wget = widget.setText, widget.text
_log.debug("tracking text field {}".format(key))
except AttributeError:
_log.debug("tracking {} number field {}".format(conv, key))
wset, wget = widget.setValue, lambda fget=widget.value: conv(fget())
except Exception as e:
_log.error(e)
try:
wset(conv(value))
except Exception as e:
_log.debug(e)
_log.warning('failed for "{}" updating field of type {} with {}'.format( key, type(widget), value))
finally:
# _log.debug('wget = {}; wset = {}'.format(wget, wset))
widget.editingFinished.connect(lambda w=widget, k=key, func_get=wget: self.persist_setting(k, func_get() ) )
#self.storage_cascade_prefix(None) #ZAC: orig. code
def _OLD_init_validators(self):
identifier_regex = QRegExp("[a-z-A-Z_0-9%]+")
self._le_project.setValidator(QRegExpValidator(identifier_regex, self._le_project))
self._le_prefix.setValidator(QRegExpValidator(identifier_regex, self._le_prefix))
def _OLD_wire_storage(self):
self._le_prefix.textChanged.connect(self._OLD_storage_cascade_prefix)
self._le_prefix.textChanged.connect(lambda newtext: self.prefixChanged.emit(newtext))
self._le_project.textChanged.connect(self._OLD_storage_cascade_prefix)
self._le_project.textChanged.connect(lambda newtext: self.projectChanged.emit(newtext))
self._le_prefix.editingFinished.connect(self._OLD_prepare_daq_folder)
self._le_project.editingFinished.connect(self._OLD_prepare_daq_folder)
self.increaseRunNumberRequest.connect(self._OLD_increase_run_number)
def _OLD_storage_cascade_prefix(self, val):
prefix = self._le_prefix.text()
if 0 == len(prefix):
_log.warning("empty prefix is not accepted")
self._le_prefix.setAccessibleName("invalid_input")
self._le_prefix.blockSignals(True)
self._le_prefix.setText("INVALID=>" + prefix)
QMessageBox.warning(self, "prefix is not valid", "Prefix is not valid!")
self._le_prefix.blockSignals(False)
return
else:
self._le_prefix.setAccessibleName("")
project = self._le_project.text()
folders.set_prefix(prefix)
folders.set_project(project)
folders.run = settings.value("run_number", type=int)
self._label_runnumber.setText(f"{folders.run:04d}")
self._data_folder = folders.raw_folder
self.folderChanged.emit(folders.raw_folder)
self._label_actual_prefix.setText(folders.prefix)
self._label_folder.setText(folders.raw_folder)
def _OLD_increase_run_number(self):
run = settings.value("run_number", type=int)
run += 1
settings.setValue("run_number", run)
folders.run = run
self._label_runnumber.setText(f"{run:04d}")
def _OLD_prepare_daq_folder(self):
global home, folders
prefix = folders.prefix
folder = folders.res_folder
if 0 == len(prefix):
return
try:
os.makedirs(folder, 0o750, exist_ok=True)
except:
msg = "Failed to create folder: {}".format(folder)
_log.warning(msg)
QMessageBox.warning(self, "Screenshot: failed to create folder", "Failed to create output folder for screenshot!\n\n\tScreenshot not taken!",)
raise
fname = os.path.join(folders.pgroup_folder, ".latest_raw")
try:
with open(fname, "w") as f:
f.write(folders.raw_folder)
_log.info("wrote: {}".format(fname))
except:
_log.warning("failed writing {}".format(fname))
fname = os.path.join(folders.pgroup_folder, ".latest_res")
try:
with open(fname, "w") as f:
f.write(folders.res_folder)
_log.info("wrote: {}".format(fname))
except:
_log.warning("failed writing {}".format(fname))
def _OLD_persist_setting(self, s, v):
app=QApplication.instance()
cfg=app._cfg
_log.debug("persisting {} = {}".format(s, v))
cfg.setValue(s, v)
def _OLD_method_changed(self, index):
method = self._tabs_daq_methods.currentWidget().accessibleName()
_log.info("method now => {}".format(method))
def _OLD_check_zescape(self):
msg = zescape.check()
if msg is None:
return
if "current" in msg:
_log.warning(f"current state: {self._esc_state}")
zescape.reply(self._esc_state)
elif "goto" in msg:
state = msg.split()[1].lower()
_log.warning(f"TELL requests to go to {state}")
try:
if "sampleexchange" in state:
_log.debug(
f"moving to mount with offset = {self._pin_mounting_offset}"
)
self.move_gonio_to_mount_position(offset=self._pin_mounting_offset)
elif "samplealignment" in state:
self.cb_esc_sample_alignment()
except:
zescape.reply("Maintenance")
zescape.reply(self._esc_state)
else: # JSON
data = json.loads(msg)
if "sampleName" in data:
_log.debug(f"TELL SAMPLE DATA => {data}")
self.tell2storage(data)
zescape.reply("ack")
elif "pin_offset" in data:
_log.debug(f"TELL pin offset => {data}")
self._pin_mounting_offset = data["pin_offset"]
zescape.reply("ack")
elif "get_pin_offset" in data:
_log.debug(f"TELL get pin offset => {data}")
zescape.reply_json({"pin_offset": self._pin_mounting_offset})
def _OLD_tell2storage(self, sample):
_log.debug(f"2 TELL SAMPLE DATA => {type(sample)}")
self._le_prefix.setText(sample["sampleName"])
self._le_project.setText(sample["sampleFolder"])
tstf = folders.get_prefixed_file("_newsample")
self.storage_cascade_prefix(None)
_log.warning(f"sample info: {tstf}")
def _OLD_is_task(self, task):
return task == self._active_task
def _OLD_get_task_menu(self):
pass
def _OLD_toggle_shutter(self, **kwargs):
if self._pv_shutter:
if 0 == self._pv_shutter.get():
self._pv_shutter.put(1)
self._button_shutter.setText("shutter opened\n\u2622")
else:
self._pv_shutter.put(0)
self._button_shutter.setText("shutter closed\n\u2622")
elif self._has_pulse_picker:
pulsePicker.toggle()
def _OLD_update_shutter_label(self, pvname, value, char_value, **kwargs):
if 0 == value:
self._button_shutter.setText("shutter closed")
else:
self._button_shutter.setText("shutter opened")
def _OLD_build_sample_selection_tab(self):
self._sample_selection = sample_selection.SampleSelection(self)
self._sample_selection.move_to_mount_position = (self.move_gonio_to_mount_position)
self._tab_sample_selection.setLayout(QVBoxLayout())
self._tab_sample_selection.layout().addWidget(self._sample_selection)
self._tab_sample_selection.layout().addStretch(2)
def _OLD_build_embl_group(self):
grp = QGroupBox("EMBL Acquisition")
layout = QFormLayout()
grp.setLayout(layout)
layout.addWidget(QLabel("Prefix"))
self._embl_prefix = QLineEdit("img")
layout.addWidget(self._embl_prefix)
def _OLD_abort_measurement(self, ev=None):
if settings.value(ACTIVATE_PULSE_PICKER):
pulsePicker.close()
jungfrau_detector.abort()
delta_tau.abort()
_log.debug("aborting measurement")
def _OLD_trigger_detector(self, **kwargs):
if self._pv_shutter is not None:
self._pv_shutter.put(0)
# self._eiger_button_collect.show()
# self._eiger_button_abort.hide()
# self._eiger_now_collecting_label.setText(
# "Finished sequence id: {}\n"
# "Data in: Data10/{}".format(
# self._detector_sequence_id, self._eiger_now_collecting_file
# )
# )
def _OLD_modify_camera_transform(self, t):
if t == "remove_all":
sample_camera.set_transformations([])
elif t == "undo_last":
sample_camera._transformations.pop()
#elif type(t) ==type(camera.Transforms): #ZAC: orig. code
# sample_camera.append_transform(t)
try:
label = ", ".join([t.name for t in sample_camera._transformations])
except:
label = ""
self._label_transforms.setText(label)
#settings.setValue(CAMERA_TRANSFORMATIONS, sample_camera._transformations) #ZAC: orig. code
def _OLD_roi_add_line(self):
roi = pg.LineSegmentROI(
[200, 200],
[300, 300],
pen="r",
scaleSnap=True,
translateSnap=True,
rotateSnap=True,
removable=True,
)
# roi.sigRegionChanged.connect(self.track_roi)
roi.sigRemoveRequested.connect(self.remove_roi)
self.vb.addItem(roi)
self._rois.append(roi)
def _OLD_roi_add_rect(self):
roi = pg.RectROI(
[200, 200],
[50, 50],
pen="r",
scaleSnap=True,
translateSnap=True,
rotateSnap=True,
removable=True,
)
roi.sigRegionChanged.connect(self.track_roi)
roi.sigRemoveRequested.connect(self.remove_roi)
self.vb.addItem(roi)
self._rois.append(roi)
def _OLD_remove_roi(self, roi):
self.vb.removeItem(roi)
self._rois.remove(roi)
def _OLD_prepare_microscope_page(self):
layout = self.microscope_page.layout()
container = QWidget()
hlay = QHBoxLayout()
container.setLayout(hlay)
layout.addWidget(container)
def _OLD_update_beam_marker(self, zoom_lvl):
w, h = settings.value(BEAM_SIZE)
try:
bx = self.beamx_fitter(zoom_lvl)
by = self.beamy_fitter(zoom_lvl)
ok = True
except:
ok = False
_log.warning("beam marker not defined")
return
_log.debug("updating beam mark to {:.1f}x{:.1f}".format(bx, by))
self.beamCameraCoordinatesChanged.emit(bx, by)
def _OLD_update_beam_marker_fitters(self):
if len(self._beam_markers) > 2:
_log.debug("defining beam marker")
bx = [(n, x[0]) for n, x in self._beam_markers.items()]
by = [(n, x[1]) for n, x in self._beam_markers.items()]
nbx = np.asarray(bx).T
nby = np.asarray(by).T
bx_coefs = np.polyfit(nbx[0], nbx[1], 3)
by_coefs = np.polyfit(nby[0], nby[1], 3)
_log.debug(".... beam marker X coeficients {}".format(bx_coefs))
_log.debug(".... beam marker Y coeficients {}".format(by_coefs))
self.beamx_fitter = np.poly1d(bx_coefs)
self.beamy_fitter = np.poly1d(by_coefs)
def _OLD_append_to_beam_markers(self, x, y, zoom):
self._beam_markers[zoom] = (x, y)
_log.info("beam markers {}".format(self._beam_markers))
settings.setValue(BEAM_MARKER_POSITIONS, self._beam_markers)
self.update_beam_marker_fitters()
def _OLD_remove_beam_markers(self):
self._beam_markers = {}
self.beamx_fitter = None
self.beamy_fitter = None
def _OLD_track_roi(self, roi):
x, y = roi.pos()
w, h = roi.size()
# area = roi.getArrayRegion(self._im, self.img)
# sum = np.sum(area)
# _log.info('{} => sum {}'.format((x,y), sum))
bx, by = x + w / 2., y + h / 2.
_log.info("beam pos {}".format((bx, by)))
_log.info("marker pos = {} ; size = {}".format((x, y), (w, h)))
def _OLD_toggle_mouse_tracking(self):
if self._mouse_tracking:
self.disengage_mouse_tracking()
else:
self.engage_mouse_tracking()
def _OLD_engage_mouse_tracking(self):
self.glw.scene().sigMouseMoved.connect(self.cb_mouse_move)
self.glw.scene().sigMouseMoved.emit()
self._mouse_tracking = True
def _OLD_disengage_mouse_tracking(self):
self.glw.scene().sigMouseMoved.disconnect(self.cb_mouse_move)
self._mouse_tracking = False
self._lb_coords.setText("")
def _OLD_get_beam_mark_on_camera_xy(self):
app=QApplication.instance()
z = app._zoom.get_val()
try:
bx = self.beamx_fitter(z)
by = self.beamy_fitter(z)
except:
bx, by = 500, 500
return (bx, by)
def _OLD_move_gonio_to_position(self, fx, fy, bx, bz, omega):
self.tweakers["fast_x"].motor.move(fx, wait=False, ignore_limits=True)
self.tweakers["fast_y"].motor.move(fy, wait=False, ignore_limits=True)
self.tweakers["base_x"].motor.move(bx, wait=False, ignore_limits=True)
self.tweakers["base_z"].motor.move(bz, wait=False, ignore_limits=True)
self.tweakers["omega"].motor.move(omega, wait=False, ignore_limits=True)
def _OLD_get_gonio_motors(self, as_json=False):
if as_json:
return {
"fast_x": self.tweakers["fast_x"].motor,
"fast_y": self.tweakers["fast_y"].motor,
"base_x": self.tweakers["base_x"].motor,
"base_z": self.tweakers["base_z"].motor,
"omega": self.tweakers["omega"].motor,
}
else:
return (
self.tweakers["fast_x"].motor,
self.tweakers["fast_y"].motor,
self.tweakers["base_x"].motor,
self.tweakers["base_z"].motor,
self.tweakers["omega"].motor,
)
def _OLD_get_gonio_tweakers(self):
return (
self.tweakers["fast_x"],
self.tweakers["fast_y"],
self.tweakers["base_x"],
self.tweakers["base_z"],
self.tweakers["omega"],
)
def _OLD_get_gonio_positions(self, as_json: bool = False):
fx, fy, cx, cz, omega = (
self.tweakers["fast_x"].motor,
self.tweakers["fast_y"].motor,
self.tweakers["base_x"].motor,
self.tweakers["base_z"].motor,
self.tweakers["omega"].motor,
)
a, b, c, d, e = (
fx.get_position(),
fy.get_position(),
cx.get_position(),
cz.get_position(),
omega.get_position(),
)
if as_json:
return {"fx": a, "fy": b, "bx": c, "bz": d, "omega": e}
else:
return (a, b, c, d, e)
def _OLD_escape_goToTellMountPosition(self):
self.move_gonio_to_mount_position()
self.lock_goniometer()
def _OLD_move_gonio_to_mount_position(self, offset: float = 0.0):
fx, fy, cx, cz, omega = self.get_gonio_motors()
bmark = "bookmark_0"
try:
t_fx = float(settings.value(bmark + "/mount_fx"))
t_fy = -offset + float(settings.value(bmark + "/mount_fy"))
t_cx = float(settings.value(bmark + "/mount_cx"))
t_cz = float(settings.value(bmark + "/mount_cz"))
t_omega = float(settings.value(bmark + "/mount_omega"))
except:
raise IncompleteConfiguration("TELL sample changer mount position is not configured!!!")
fx.move(t_fx, wait=True, ignore_limits=True)
fy.move(t_fy, wait=True, ignore_limits=True)
cx.move(t_cx, wait=True, ignore_limits=True)
cz.move(t_cz, wait=True, ignore_limits=True)
omega.move(t_omega, wait=True, ignore_limits=True)
app_utils.assert_motor_positions(
[
(fx, t_fx, 0.01),
(fy, t_fy, 0.01),
(cx, t_cx, 0.01),
(cz, t_cz, 0.01),
(omega, t_omega, 0.01),
]
)
self.cb_esc_sample_exchange()
def _OLD_lock_goniometer(self):
# tell.set_in_mount_position(True)
res = QMessageBox.question(self, "", "Mount a sample from console and click ok once the sample is mounted.", QMessageBox.Ok, QMessageBox.Ok,)
res = QMessageBox.question(self, "", "Is the sample is mounted?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No,)
if res in (QMessageBox.Yes, QMessageBox.No):
# tell.set_in_mount_position(False)
pass
@pyqtSlot(int)
def _OLD_saveBookmark(self, key: int):
"""save a bookmark for the corresponding key is the Qt.Key_* code """
fx, fy, cx, cz, omega = (
self.tweakers["fast_x"].motor,
self.tweakers["fast_y"].motor,
self.tweakers["base_x"].motor,
self.tweakers["base_z"].motor,
self.tweakers["omega"].motor,
)
bmark = "bookmark_{}".format(key)
if key == 0:
ans = QMessageBox.question(self, "Override TELL mount position", "This will overwrite the positions used for TELL MOUNTING!!!\n\n\tContinue ?",)
if ans != QMessageBox.Yes:
return
_log.info(
"saving bookmark {}: {}, {}, {}, {}, {}".format(
bmark,
fx.get_position(),
fy.get_position(),
cx.get_position(),
cz.get_position(),
omega.get_position(),
)
)
settings.setValue(bmark + "/mount_fx", fx.get_position())
settings.setValue(bmark + "/mount_fy", fy.get_position())
settings.setValue(bmark + "/mount_cx", cx.get_position())
settings.setValue(bmark + "/mount_cz", cz.get_position())
settings.setValue(bmark + "/mount_omega", omega.get_position())
def _OLD_gotoBookmark(self, key: int):
"""save a bookmark for the corresponding key"""
fx, fy, cx, cz, omega = (
self.tweakers["fast_x"].motor,
self.tweakers["fast_y"].motor,
self.tweakers["base_x"].motor,
self.tweakers["base_z"].motor,
self.tweakers["omega"].motor,
)
bmark = "bookmark_{}".format(key)
try:
t_fx = float(settings.value(bmark + "/mount_fx"))
t_fy = float(settings.value(bmark + "/mount_fy"))
t_cx = float(settings.value(bmark + "/mount_cx"))
t_cz = float(settings.value(bmark + "/mount_cz"))
t_omega = float(settings.value(bmark + "/mount_omega"))
except:
return
fx.move(t_fx, wait=True, ignore_limits=True)
fy.move(t_fy, wait=True, ignore_limits=True)
cx.move(t_cx, wait=True, ignore_limits=True)
cz.move(t_cz, wait=True, ignore_limits=True)
omega.move(t_omega, wait=True, ignore_limits=True)
def _OLD_move_cryojet_nozzle(self, pos):
cx = self.tweakers["cryo"]
if "in" == pos.lower():
key = CRYOJET_NOZZLE_IN
elif "out" == pos.lower():
key = CRYOJET_NOZZLE_OUT
to_pos = settings.value(key, 1e10, type=float)
if to_pos > 1e9:
raise IncompleteConfiguration(f"CRYOJET configuration is incomplete! Missing {key}")
cx.move_abs(to_pos, assert_position=True)
def _OLD_build_cryo_group(self, toolbox):
pfx=QApplication.instance()._cfg.value(AppCfg.GBL_DEV_PREFIX)[0]
qutilities.add_item_to_toolbox(
toolbox,
"Cryojet",
widget_list=[
self.get_tweaker(f"{pfx}:MOT_CRYO", alias="cryo", label="cryo X")
],
)
def _OLD_build_wegde_group(self, toolbox):
pfx=QApplication.instance()._cfg.value(AppCfg.GBL_DEV_PREFIX)[0]
qutilities.add_item_to_toolbox(toolbox,"Wedge Mover",
widget_list=[
self.get_tweaker(f"{pfx}:MOT_WEDGE1", alias="wedge_1", label="wedge_1"),
self.get_tweaker(f"{pfx}:MOT_WEDGE2", alias="wedge_2", label="wedge_2"),
self.get_tweaker(f"{pfx}:MOT_WEDGE3", alias="wedge_3", label="wedge_3"),
self.get_tweaker(f"{pfx}:MOT_WEDGE4", alias="wedge_4", label="wedge_4"),
self.get_tweaker(f"{pfx}:MOT_WEDGEX", alias="wedge_x", label="wedge_x"),
self.get_tweaker(f"{pfx}:MOT_WEDGEY", alias="wedge_y", label="wedge_y"),
self.get_tweaker(f"{pfx}:MOT_WEDGEA", alias="wedge_a", label="wedge_a"),
self.get_tweaker(f"{pfx}:MOT_WEDGEB", alias="wedge_b", label="wedge_b"),
],
)
def _OLD_build_slits_group(self, toolbox):
pfx=QApplication.instance()._cfg.value(AppCfg.GBL_DEV_PREFIX)[0]
qutilities.add_item_to_toolbox(
toolbox,
"Slits",
widget_list=[
self.get_tweaker(f"{pfx}10", alias="slit_right", label="left", mtype=1, ),
self.get_tweaker(f"{pfx}11", alias="slit_left", label="right", mtype=1,),
self.get_tweaker(f"{pfx}12", alias="slit_bottom", label="bottom", mtype=1,),
self.get_tweaker(f"{pfx}13",alias="slit_top",label="top",mtype=1,),
],
)
def _OLD_assert_post_tube_position(self, pos):
x_up = settings.value("post_sample_tube/x_up", 1e10, type=float)
y_up = settings.value("post_sample_tube/y_up", 1e10, type=float)
x_down = settings.value("post_sample_tube/x_down", 1e10, type=float)
y_down = settings.value("post_sample_tube/y_down", 1e10, type=float)
dx = settings.value("post_sample_tube/dx", 1e10, type=float)
dy = settings.value("post_sample_tube/dy", 1e10, type=float)
tz_in = settings.value("post_sample_tube/z_in", 1e10, type=float)
tz_out = settings.value("post_sample_tube/z_out", 1e10, type=float)
if (x_up + y_up + x_down + y_down + dx + dy + tz_in + tz_out) > 1e9:
raise Exception("miscofingured positions for post-sample tube")
usy = self.tweakers["tube_usy"]
dsy = self.tweakers["tube_dsy"]
usx = self.tweakers["tube_usx"]
dsx = self.tweakers["tube_dsx"]
tbz = self.tweakers["tube_z"]
if pos == "in":
yu = y_up
xu = x_up
yd = y_down
xd = x_down
z = tz_in
elif pos == "out":
yu = y_up + dy
xu = x_up + dx
yd = y_down + dy
xd = x_down + dx
z = tz_out
app_utils.assert_tweaker_positions([
(usy, yu, 0.1),
(dsy, yd, 0.1),
(usx, xu, 0.1),
(dsx, xd, 0.1),
(tbz, z, 0.1),],timeout=2.0,
)
def _OLD_add_tweaker(self, pv, alias=None, label=None, mtype="epics_motor", layout=None):
if layout is None:
layout = self._tweak_container.layout()
if mtype == "epics_motor":
m = MotorTweak()
else:
m = SmaractMotorTweak()
layout.addWidget(m)
m.connect_motor(pv, label)
self.tweakers[alias] = m
def _OLD_done_sliding(self):
print("done sliding at {}".format(self.slider_fast_x.value()))
def _OLD_daq_grid_add_grid(self, gx=None, gy=None):
grid_index = len(self._grids)
if gx in (False, None):
gx=self.tweakers["fast_x"].get_rbv()
gy=self.tweakers["fast_y"].get_rbv()
xstep = self._sb_grid_x_step.value()
ystep = self._sb_grid_y_step.value()
xoffset = self._sb_grid_x_offset.value()
yoffset = self._sb_grid_y_offset.value()
app=QApplication.instance()
geo=app._geometry
oc=geo._opt_ctr
if xstep==0:
go=UsrGO.Grid((120, -100), (200, 150), (30, 22), 2)
elif xstep==1:
go=UsrGO.FixTargetFrame((120, -100), (200, 150), tpl='test')
elif xstep==2:
v=geo.pos2pix((12.5, 0))
l=np.linalg.norm(v)
go=UsrGO.FixTargetFrame(-oc, (l,l), tpl='12.5x12.5')
elif xstep==3:
v=geo.pos2pix((23, 0))
l=np.linalg.norm(v)
go=UsrGO.FixTargetFrame(-oc, (l,l), tpl='23.0x23.0')
else:
_log.error('set xstep 0..2 for tests')
self.vb.addItem(go)
self._goTracked['objLst'].append(go)
#grid = CstROI.Grid( x_step=xstep, y_step=ystep, x_offset=xoffset, y_offset=yoffset, gonio_xy=(gx, gy), grid_index=grid_index, parent=self,)
#self.vb.addItem(grid)
#grid.calculate_gonio_xy()
#grid.sigRemoveRequested.connect(lambda g=grid: self.remove_grid(g))
def _OLD_daq_grid_remove_all(self):
vb=self.vb
for go in self._goTracked['objLst']:
vb.removeItem(go)
self._goTracked['objLst']=[]
def _OLD_grids_pause_stage_tracking(self):
for grid in self._grids:
grid.disable_stage_tracking()
def _OLD_grids_start_stage_tracking(self):
for grid in self._grids:
grid.enable_stage_tracking()
def _OLD_remove_grid(self, grid):
self.vb.removeItem(grid)
self._grids.remove(grid)
def _OLD_daq_grid_update(self, grid_index):
try:
grid = self._grids[grid_index]
except:
print("grid index not yet there")
return
points = grid.get_grid_targets()
num_points = len(points)
etime = float(settings.value("exposureTime"))
doc = f"grid_{grid_index} = ["
for n, pos in enumerate(points):
x, y = pos
doc += "[{:8.3f}, {:8.3f}],\n".format(x, y)
doc += "]"
self._grid_inspect_area.setPlainText(doc)
m = "Number of points: {}\nEstimated Time: {:.1f} minutes".format(num_points, num_points * etime / 60.)
self._label_grid_parameters.setText(m)
def _OLD_daq_embl_collect_points(self):
coords = self._embl_module.coords
points = [[x, y] for x, y, bx, bz, o in coords]
points = np.array(points)
method = "trajectory"
xp = (1000 * points).astype(int) # microns then round int
params = (xp[:, 0].tolist(), xp[:, 1].tolist())
self.daq_collect_points(points, visualizer_method=method, visualizer_params=params)
def _OLD_daq_prelocated_collect_points(self):
points = []
data = self.prelocationModule.get_collection_targets()
for n, cc in enumerate(data):
is_fiducial, gx, gy, cx, cy, ox, oy = cc
points.append([gx, gy])
points = np.array(points)
method = "trajectory"
xp = (1000 * points).astype(int) # microns then round int
params = (xp[:, 0].tolist(), xp[:, 1].tolist())
self.daq_collect_points(points, visualizer_method=method, visualizer_params=params)
def _OLD_daq_grid_findxtals(self):
feature_size = self._sb_findxtals_feature_size.value()
image = sample_camera.get_image()
findObj(-image, objSize=feature_size, viz=1)
def _OLD_check_jungfrau_save(self) -> bool:
if jungfrau_detector.is_running_detector():
saveRaw = jungfrau_detector.is_saving_data()
if not saveRaw:
box = QMessageBox()
box.setText("Jungfrau save data disabled!")
box.setInformativeText("Jungfrau save data is disabled!")
box.setIcon(QMessageBox.Warning)
box.setDetailedText("Choose to abort, enable and continue, or continue without saving raw data")
btContinue = box.addButton("Continue", QMessageBox.YesRole)
btAbort = box.addButton("OMG! Abort", QMessageBox.NoRole)
btEnable = box.addButton("Enable save and continue", QMessageBox.YesRole)
box.exec_()
ans = box.clickedButton()
if ans == btEnable:
jungfrau_detector.set_save_raw(True)
return True
elif ans == btAbort:
_log.info("not doing helical scan")
return False
return True
return True
def _OLD_daq_collect_points(self, points, visualizer_method, visualizer_params):
app = QApplication.instance()
cfg = app._cfg
verbose=0xff
fn='/tmp/shapepath'
try:
dt=app._deltatau
except AttributeError:
app._deltatau=dt=deltatau.Deltatau()
try:
jf=app._jungfrau
except AttributeError:
app._jungfrau=jf=detector.Jungfrau()
sp=dt._shapepath
sp.gen_grid_points(w=15, h=15, pitch=3, rnd=0, ofs=(0, +2000))
#sp.gen_grid_points(w=5, h=10, pitch=1, rnd=0, ofs=(0, 0));sp.sort_points(False, 10);sp.points
sp.sort_points(False, 15);
sp.meta['pt2pt_time']=10
sp.setup_gather()
sp.setup_sync(verbose=verbose&32, timeOfs=0.05)
sp.setup_coord_trf() # reset to shape path system
# sp.meta['pt2pt_time']=10 #put between setup_sync and setup_motion to have more motion points than FEL syncs
sp.setup_motion(fnPrg=fn+'.prg', mode=3, scale=1., dwell=10)
sp.homing() # homing if needed
sp.run() # start motion program
sp.wait_armed() # wait until motors are at first position
sp.trigger(0.5) # send a start trigger (if needed) ater given time
if not dt._comm is None:
while True:
p=int(sp.progress())
if p<0: break
#print('progress %d/%d'%(p, num_pts))
time.sleep(.1)
sp.gather_upload(fnRec=fn+'.npz')
#dp=deltatau.shapepath.DebugPlot(sp)
#dp.plot_gather(mode=11)
#plt.show(block=False)
#plt.show(block=True)
return
task = self.active_task()
XDIR = -1
#folders.make_if_needed()
#if ( cfg.option(AppCfg.ACTIVATE_PULSE_PICKER) and not jungfrau_detector.is_running_detector()):
# if QMessageBox.No == QMessageBox.question(self, "X-rays but no Jungfrau",
# "X-rays will be used bu the Jungfrau will not run.\n\n\tContinue?",):
# _log.warning("user forgot to turn on the jungfrau")
# return
#if option(ACTIVATE_PULSE_PICKER) or not option(SKIP_ESCAPE_TRANSITIONS_IF_SAFE):
# self.escape_goToDataCollection()
points *= 1000 # delta tau uses micrometers
points[:, 0] *= XDIR # fast X axis is reversed
# sync_mode : default=2
# 0 : no sync at all
# 1 : synchronize start
# 2 : synchronize start and adapt motion speed
# this function generates the code blocks:
# self.sync_wait and self.sync_run
# sync_wait can be put in the program to force a timing sync
# sync_run are the commands to run the whole program
# sync_flag if not using jungfrau =1 otherwise =0
# D.O. shapepath.meta.update(sync_mode=2, sync_flag=1)
sp.meta.update(sync_mode=0, sync_flag=0)
maxacq_points = 116508
samp_time = 0.0002 # s
overhead_time = 0.1
etime=10
vscale=1.0
#etime = settings.value("exposureTime", type=float)
#vscale = settings.value(DELTATAU_VELOCITY_SCALE, 1.0, type=float)
#sort_points = option(DELTATAU_SORT_POINTS)
acq_per = int(np.ceil((etime * len(points) + overhead_time) / (maxacq_points * samp_time)))
_log.info(f"gather acquisotion period = {acq_per}")
_log.info(f"velocity scale {vscale}")
sp.setup_gather(acq_per=acq_per)
sp.setup_sync(verbose=True)
sp.setup_coord_trf()
assert(points.dtcfgype==np.float64)
sp.points = points
if TASK_GRID == task:
# width, height = visualizer_params
# _log.debug(f"grid: {width} x {height}")
# details_1 = [width]
# details_2 = [height]
# sp.sort_points(xy=False, grp_sz=height)
pass
elif task in (TASK_PRELOCATED, TASK_EMBL):
if sort_points:
shapepath.sort_points()
self.daq_method_prelocated_remove_markers()
details_1, details_2 = visualizer_params
sp.setup_motion(
mode=3, # 1 = bad pvt 3 = pft (pvt via inverse fourier transform)
pt2pt_time=etime * 1000.,
#fnPrg=folders.get_prefixed_file("_program.prg"),
scale=vscale, # velocity at target position scaling: 1=optimal speed, 0=zero speed
dwell=10, # milli-seconds wait
)
sp.run()
self.qprogress = QProgressDialog(self)
self.qprogress.setRange(0, 0)
self.qprogress.setLabelText("Acquiring GRID")
self.qprogress.setCancelButtonText("Abort Acquisition")
self.qprogress.canceled.connect(self.complete_daq)
self.qprogress.setWindowModality(Qt.WindowModal)
self.qprogress.setAutoClose(True)
self.qprogress.show()
sequencer_steps = [lambda: self.grids_pause_stage_tracking()]
if jungfrau_detector.is_running_detector():
if not self.check_jungfrau_save():
# user aborted run from save data dialog
return
n_frames = ntrigger
uid = settings.value(EXPERIMENT_UID, type=int)
backend_extras = self.jungfrau.get_parameters()
backend_extras.update(
{
"swissmx_trajectory_method": visualizer_method,
"swissmx_trajectory_details_1": details_1,
"swissmx_trajectory_details_2": details_2,
}
)
jungfrau_detector.set_number_of_frames(n_frames)
jungfrau_detector.set_data_owner_uid(uid)
sequencer_steps.extend(
[
lambda: jungfrau_detector.configure(
n_frames=n_frames,
outfile=folders.prefix,
outdir=folders.raw_folder,
uid=uid,
backend_extras=backend_extras,
),
lambda: jungfrau_detector.arm(),
]
)
sequencer_steps.append(lambda: shapepath.wait_armed())
if option(ACTIVATE_PULSE_PICKER):
sequencer_steps.append(lambda: pulsePicker.open())
# if settings.value("scanning/trigger_microscope_camera", type=bool):
# sample_camera.switch_to_trigger(True)
sequencer_steps.append(lambda: shapepath.trigger(wait=0.5))
def _OLD_shapepath_progress():
while True:
p = shapepath.progress()
if p < 0:
break
time.sleep(0.1)
self.qprogress.setLabelText(f"Acquiring GRID {p:.0f} / {ntrigger}")
_log.warning(f"motion complete!")
# sample_camera.stop_camera()
# sample_camera.switch_to_trigger(False)
# sample_camera.save_buffer_series(folders.prefix)
sequencer_steps.append(shapepath_progress)
if option(ACTIVATE_PULSE_PICKER):
sequencer_steps.append(lambda: pulsePicker.close())
sequencer_steps.append(lambda: jungfrau_detector.wait_finished())
sequencer_steps.append(lambda: self.grids_start_stage_tracking())
self.sequencer = Sequencer(steps=sequencer_steps)
self._thread = QThread()
self._thread.setObjectName("acquisition_thread")
self.sequencer.moveToThread(self._thread)
self.sequencer.finished.connect(self.daq_collect_points_wrapup)
self._thread.started.connect(self.sequencer.run_sequence)
self._thread.start()
def _OLD_run_steps(self, steps, title, at_end=None, cancelable=False):
dlg = QProgressDialog(self)
dlg.setWindowTitle(title)
dlg.setWindowModality(Qt.WindowModal)
dlg.setMinimumDuration(0)
if not cancelable:
dlg.setCancelButton(None)
dlg.setRange(0, 0)
dlg.setLabelText(f"<b>{title}</b><br/>")
dlg.setAutoClose(True)
dlg.show()
dlg.setValue(random.randint(1, 20))
class Runner(QObject):
finito = pyqtSignal()
timeoutExpired = pyqtSignal()
errorHappened = pyqtSignal(str)
result = pyqtSignal(str)
def __init__(self, step_to_run):
super().__init__()
self.step = step_to_run
self.exception = None
self.done = False
def run(self):
try:
self.step()
except Exception as e:
_log.debug(" +> step exception")
self.exception = str(e)
self.errorHappened.emit(str(e))
self.finito.emit()
for n, step in enumerate(steps):
_log.info(f"running step {step.title}")
dlg.setLabelText(f"<b>{title}</b><br/><font color=\"blue\">{step.title}</font>")
dlg.setValue(n)
thread = QThread()
runner = Runner(step)
runner.moveToThread(thread)
thread.started.connect(runner.run)
runner.finito.connect(thread.quit)
thread.start()
while thread.isRunning():
dlg.setValue(random.randint(1, 20))
time.sleep(0.01)
if dlg.wasCanceled():
# FIXME ensure we abort the running thread
break
if dlg.wasCanceled():
break
if runner.exception is not None:
QMessageBox.critical(self, step.title, str(runner.exception))
break
if dlg.wasCanceled():
_log.error(f"sequence {title} was cancelled by user")
raise AcquisitionAbortedException(f"sequence {title} was cancelled by user")
if at_end is not None:
at_end()
dlg.reset()
def _OLD_daq_collect_points_wrapup(self):
self.qprogress.reset()
if self._thread.isRunning():
self._thread.quit()
shapepath.gather_upload(os.path.join(folders.res_folder, folders.prefix + ".npz"))
if option(DELTATAU_SHOW_PLOTS):
dp = DebugPlot(shapepath)
dp.plot_gather(mode=1)
pyplot.show()
if TASK_PRELOCATED == self.active_task():
self.daq_method_prelocated_update_markers()
if option(ACTIVATE_PULSE_PICKER) or not option(SKIP_ESCAPE_TRANSITIONS_IF_SAFE):
self.cb_esc_sample_alignment()
sequence = {"delta tau program": shapepath.prg, "points": shapepath.points.tolist(), "timestamp": tdstamp(),}
sfname = folders.get_prefixed_file("_ScanInfo.json")
try:
with open(sfname, "w") as sf:
_log.info("writing scan info into: {}".format(sfname))
sf.write(json.dumps(sequence))
except:
_log.warning(f"failed to write scan info to {sfname}")
self.re_connect_collect_button()
jungfrau_detector.abort()
self.increaseRunNumberRequest.emit()
def _OLD_daq_collect_update_inspect(self, msg):
te = self._inspect
m = te.toPlainText()
te.setPlainText(m + msg + "\n")
te.verticalScrollBar().setValue(te.verticalScrollBar().maximum())
def _OLD_daq_helical_collect(self):
"""[
[{
0: (0.0, 1.238401694734829, 3.527, 0.936, 0.001),
120: (0.0, 1.3890393596274455, -1.957242824091597, -0.5639999999999998, 120.001),
240: (0.0, 1.3866130349657206, -1.1642619618562273, 3.105, 240.001)
},
{
0: (0.0, 1.238401694734829, 3.527, 0.936, 0.001),
120: (0.0, 1.3890393596274455, -1.957242824091597, -0.5639999999999998, 120.001),
240: (0.0, 1.3866130349657206, -1.1642619618562273, 3.105, 240.001)
}]
]
"""
_log.info("executing collection")
htab = self._helical_scan_table
num_h = htab.scanHorizontalCount()
num_v = htab.scanVerticalCount()
if ( settings.value(ACTIVATE_PULSE_PICKER) and not jungfrau_detector.is_running_detector()):
if QMessageBox.No == QMessageBox.question(self, "X-rays but no Jungfrau",
"X-rays will be used bu the Jungfrau will not run.\n\n\tContinue?",):
_log.warning("user forgot to turn on the jungfrau")
return
if option(ACTIVATE_PULSE_PICKER) or not option(SKIP_ESCAPE_TRANSITIONS_IF_SAFE):
self.cb_esc_data_collection()
folders.make_if_needed()
data = htab.get_data()
_log.debug(data)
start, end = data[0]
FX, FY, BX, BZ, O = range(5)
x = (
(-1000 * start[0][BX], -1000 * start[120][BX], -1000 * start[240][BX]),
(-1000 * end[0][BX], -1000 * end[120][BX], -1000 * end[240][BX]),
)
y = (1000 * start[0][FY], 1000 * end[0][FY])
z = (
(-1000 * start[0][BZ], -1000 * start[120][BZ], -1000 * start[240][BZ]),
(-1000 * end[0][BZ], -1000 * end[120][BZ], -1000 * end[240][BZ]),
)
if jungfrau_detector.is_running_detector():
if not self.check_jungfrau_save():
return
saveRaw = jungfrau_detector.is_saving_data()
_log.debug(f"x = {x}")
_log.debug(f"y = {y}")
_log.debug(f"z = {z}")
oscillationAngle = settings.value("oscillationAngle", type=float)
exposureTime = 1000 * settings.value("exposureTime", type=float)
blastRadius = settings.value("blastRadius", type=float)
totalRange = num_v * num_h * oscillationAngle
# sync_mode : default=2
# 0 : no sync at all
# 1 : synchronize start
# 2 : synchronize start and adapt motion speed
# this function generates the code blocks:
# self.sync_wait and self.sync_run
# sync_wait can be put in the program to force a timing sync
# sync_run are the commands to run the whole program
# sync_flag if not using jungfrau =1 otherwise =0
# D.O. helical.meta.update(sync_mode=2, sync_flag=1)
helical.meta.update(sync_mode=0, sync_flag=0)
helical.calcParam(x=x, y=y, z=z)
helical.setup_gather()
helical.setup_sync()
helical.setup_coord_trf()
mode = 1
hRng = (-blastRadius * num_h, blastRadius * num_h)
w_start = 1000 * htab.startAngle()
wRng = (w_start, w_start + (totalRange * 1000))
_log.info(
f"helical params mode={mode}, cnthor={num_h}, cntvert={num_v}, hrng={hRng}, wrng={wRng}"
)
helical.setup_motion(
mode=mode,
cntHor=num_h,
cntVert=num_v,
hRng=hRng,
wRng=wRng,
pt2pt_time=exposureTime,
) # hRng in micrometers
helical.run()
try:
with open(folders.get_prefixed_file("_helical_debug.cmd"), "w") as fd:
fd.write("calcParam(x={}, y={}, z={})".format(x, y, z))
except:
pass
self.qprogress = QProgressDialog(self)
self.qprogress.setRange(0, 0)
self.qprogress.setLabelText("Acquiring HELICAL")
self.qprogress.setCancelButtonText("Abort Acquisition")
self.qprogress.canceled.connect(self.complete_daq)
self.qprogress.setWindowModality(Qt.WindowModal)
self.qprogress.setAutoClose(True)
self.qprogress.show()
sequencer_steps = [lambda: self.grids_pause_stage_tracking()]
n_frames = num_h * num_v
if jungfrau_detector.is_running_detector():
uid = settings.value(EXPERIMENT_UID, type=int)
backend_extras = self.jungfrau.get_parameters()
backend_extras.update(
{
"swissmx_trajectory_method": "grid",
"swissmx_trajectory_details_1": [-num_h],
"swissmx_trajectory_details_2": [num_v],
}
)
jungfrau_detector.set_number_of_frames(n_frames)
jungfrau_detector.set_data_owner_uid(uid)
sequencer_steps.extend(
[
lambda: jungfrau_detector.configure(
n_frames=n_frames,
outfile=folders.prefix,
outdir=folders.raw_folder,
uid=uid,
backend_extras=backend_extras,
),
lambda: jungfrau_detector.arm(),
]
)
sequencer_steps.append(lambda: helical.wait_armed())
if settings.value(ACTIVATE_PULSE_PICKER):
sequencer_steps.extend([lambda: pulsePicker.open(), lambda: pend_event(0.1)])
sequencer_steps.append(lambda: helical.trigger())
def _OLD_motion_progress():
while True:
p = helical.progress()
if p < 0:
break
time.sleep(0.1)
self.qprogress.setLabelText(f"Acquiring HELICAL {p:.0f} / {n_frames}")
_log.warning(f"helical motion complete!")
sequencer_steps.append(motion_progress)
if settings.value(ACTIVATE_PULSE_PICKER):
sequencer_steps.append(lambda: pulsePicker.close())
sequencer_steps.append(lambda: self.grids_start_stage_tracking())
self.sequencer = Sequencer(steps=sequencer_steps)
self._thread = QThread()
self._thread.setObjectName("acquisition_thread")
self.sequencer.moveToThread(self._thread)
self.sequencer.finished.connect(self.daq_helical_collect_wrapup)
self._thread.started.connect(self.sequencer.run_sequence)
self._thread.start()
def _OLD_daq_helical_collect_wrapup(self):
try:
self.qprogress.reset()
except:
pass
if self._thread.isRunning():
self._thread.quit()
helical.gather_upload(os.path.join(folders.res_folder, folders.prefix + ".npz"))
self.re_connect_collect_button()
jungfrau_detector.abort()
if option(ACTIVATE_PULSE_PICKER) or not option(SKIP_ESCAPE_TRANSITIONS_IF_SAFE):
self.cb_esc_sample_alignment()
self.increaseRunNumberRequest.emit()
if option(DELTATAU_SHOW_PLOTS):
hsgui = HelicalScanGui(helical)
hsgui.interactive_anim()
def _OLD_complete_daq(self):
_log.info("daq completed: cleaning up")
try:
self.qprogress.reset()
except:
pass
try:
if self._thread.isRunning():
self._thread.quit()
except:
pass
finally:
self.abort_measurement()
self.re_connect_collect_button()
def _OLD_re_connect_collect_button(self, callback=None, **kwargs):
_log.debug("re_connect_collect_button() {} => {}".format(callback, kwargs))
return
# button = self._button_collect
# if callback is None:
# callback = self.execute_collection
# button.setAccessibleName("collect_button")
# kwargs["label"] = "Collect"
#
# try:
# button.disconnect()
# finally:
# button.clicked.connect(callback)
#
# if "accessibleName" in kwargs:
# button.setAccessibleName(kwargs["accessibleName"])
#
# if "label" in kwargs:
# button.setText(kwargs["label"])
# self.load_stylesheet()
def _OLD_collect_abort_grid(self):
self._is_aborted = True
try:
self._eigerwaitthread._is_aborted = True
except:
pass
_log.warning("aborting grid scan")
self.abort_measurement()
delta_tau.abort()
jungfrau_detector.disarm()
self.re_connect_collect_button()
def _OLD_create_escape_toolbar(self):
cont = self._wd_right
w = QGroupBox("Escape")
layout = QHBoxLayout()
w.setLayout(layout)
but = QPushButton("Exchange\nSample")
but.setAccessibleName("escape_button_se")
but.setObjectName("action_SampleExchange")
but.clicked.connect(self.cb_esc_sample_exchange)
layout.addWidget(but)
but = QPushButton("Alignment")
but.setAccessibleName("escape_button_sa")
but.clicked.connect(self.cb_esc_sample_alignment)
layout.addWidget(but)
but = QPushButton("Collection")
but.setAccessibleName("escape_button_dc")
but.clicked.connect(self.cb_esc_data_collection)
layout.addWidget(but)
cont.layout().addWidget(w)
def _OLD_daq_method_prelocated_remove_markers(self):
try:
for m in self._marker_rois:
m.disconnect_signals()
self.vb.removeItem(m)
except Exception as e:
_log.warning("maybe failed removing markers: {}".format(e))
self._marker_rois = []
def _OLD_pause_all_markers(self):
for m in self._marker_rois:
m.disconnect_signals()
def _OLD_resume_all_markers(self):
for m in self._marker_rois:
m.reconnect_signals()
def _OLD_daq_method_prelocated_update_markers(self):
self.daq_method_prelocated_remove_markers()
data = self.prelocationModule.get_data()
add_xtals = self.prelocationModule._xtals_transformed
draw_xtals = self.prelocationModule.set_draw_crystal_marks
vb = self.vb
self._marker_rois = []
ppm = self.getPpm()
for n, cc in enumerate(data):
is_fiducial, gx, gy, cx, cy, ox, oy = cc
if not is_fiducial:
if not (add_xtals and draw_xtals):
continue
_log.debug(f"adding {'fiducial' if is_fiducial else 'xtal'} mark #{n}: {is_fiducial} {gx:.3f}, {gy:.3f}, {cx:.1f}, {cy:.1f}")
m = CstROI.CrystalCircle(
gonio_xy=(gx, gy),
parent=self,
model_row_index=n,
is_fiducial=is_fiducial,
ppm=ppm,
)
# m.sigRegionChangeFinished.connect(lambda roi=m: self.daq_method_prelocated_update_model(roi))
self._marker_rois.append(m)
vb.addItem(m)
for c in self._marker_rois:
c.reconnect_signals()
c.follow_stage()
def _OLD_daq_method_prelocated_set_fiducial(self, camx, camy, gx, gy):
_log.debug(f"camx, camy: {camx}, {camy}, fx, fy: {gx, gy}")
self.prelocationModule.set_fiducial_coords(camx, camy, gx, gy)
def _OLD_daq_method_prelocated_append_data(self, x, y, gx, gy):
_log.debug("appending to model: {} {}".format((x, y), (gx, gy)))
self.prelocationModule.append_data((x, y, gx, gy))
self.daq_method_prelocated_update_markers()
def _OLD_daq_method_prelocated_update_model(self, roi):
row = roi.get_model_row()
pos = roi.pos()
self.prelocationModule.set_data_camera(row, pos)
_log.debug("updating row {} => {}".format(row, pos))
def _OLD_daq_method_prelocated_add_reference(self):
self._references.append(pg.CircleROI())
def _OLD_move_fast_stage(self, x, y):
_log.info(f"received request to move gonio to x, y = {x:.3f}, {y:.3f} mm")
fx_motor, fy_motor, bx_motor, bz_motor, omega_motor = self.get_gonio_tweakers()
fx_motor.move_abs(x)
fy_motor.move_abs(y)
def _OLD_toggle_maximized(self):
if self.isMaximized():
self.showNormal()
else:
self.showMaximized()
def _OLD_show_window_configuration(self):
_log.debug("maximized? {}".format(self.isMaximized()))
def _OLD_update_user_and_storage(self):
global folders
pgroup = settings.value(EXPERIMENT_PGROUP, "not_set_yet")
diag = GenericDialog( title="P-group for experiment", message="Enter the p-group to be used",
inputs={EXPERIMENT_PGROUP: ("P-group", pgroup, QLineEdit(pgroup))}, use_buttons=False,)
diag.setModal(True)
run_user = getpass.getuser()
pgrp_re = re.compile(r"^p(?P<uid>\d{5})$")
if not self.is_recently_active():
diag.exec()
results = diag.results
for k, v in results.items():
m = pgrp_re.match(v)
if m:
settings.setValue(k, v)
settings.setValue(EXPERIMENT_UID, int(m["uid"]))
else:
QMessageBox.critical(self, "wrong P-group format", "P-groups are in the format:\n\n\t\tp?????\n\n\twhere ? = digit",)
self.update_user_and_storage()
return
settings.sync()
folders.set_pgroup(settings.value(EXPERIMENT_PGROUP))
try:
folders.check_permissons()
except:
folder = folders.pgroup_folder
pgroup = settings.value(EXPERIMENT_PGROUP)
box = QMessageBox()
box.setText("No Write Permission!")
box.setInformativeText("User {} has no write access to p-group {} folder:\n{}\n".format(run_user, pgroup, folder))
box.setIcon(QMessageBox.Warning)
box.setDetailedText("The folder /sf/bernina/data/{pgroup}/res/ has to writable by the user {user}, currently running the SwissMX app.".format(pgroup=pgroup, user=run_user))
btIgnore = box.addButton("Ignore", QMessageBox.NoRole)
btRetry = box.addButton("Le'me try again", QMessageBox.YesRole)
box.exec_()
ans = box.clickedButton()
if ans == btRetry:
self.update_user_and_storage()
elif ans == btIgnore:
_log.warning("no write access to pgroup but user didn't care")
self._label_pgroup.setText(settings.value(EXPERIMENT_PGROUP))
def _OLD_is_recently_active(self):
last_active = settings.value("last_active", 0, type=float)
minutes_since_last = int((time.time() - last_active) / 60.0)
return minutes_since_last < 60
def _OLD_openPreferencesDialog(self):
PreferencesDialog(self).exec_()
def _OLD_set_app_icon(self):
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QtGui.QIcon(os.path.join(scriptDir + os.path.sep + "logo.png")))
def _OLD_set_widget_background_color(self, color):
"""change a widget's color
:param color:
:return:
"""
try:
color = QtGui.QColor.colorNames().index(color)
except:
return
p = self.palette()
p.setColor(self.backgroundRole(), color)
self.setPalette(p)