From c11e0acfcdb01f9acae158a550530b159a3399fa Mon Sep 17 00:00:00 2001 From: Thierry Zamofing Date: Fri, 2 Sep 2022 18:40:14 +0200 Subject: [PATCH] add escape steps, collomator and post tube motion --- swissmx.py | 376 +++++++++++++++++++++++++++-------------------------- 1 file changed, 192 insertions(+), 184 deletions(-) diff --git a/swissmx.py b/swissmx.py index da550de..3c21317 100755 --- a/swissmx.py +++ b/swissmx.py @@ -256,6 +256,7 @@ class WndSwissMx(QMainWindow, Ui_MainWindow): QtGui.QFontDatabase.addApplicationFont("fonts/Inconsolata-Bold.ttf") QtGui.QFontDatabase.addApplicationFont("fonts/Baloo-Regular.ttf") + # TODO: Cleanup many member functions that are unused or obsolete self._pv_shutter = None # epics.PV('X06SA-ES-MD2:SHUTTER') self._has_pulse_picker = False self._at_x06sa = False @@ -270,7 +271,7 @@ class WndSwissMx(QMainWindow, Ui_MainWindow): self.init_graphics() - self._escape_current_state = "Maintenance" + self._esc_state ="Maintenance" self._pin_mounting_offset = 0.0 self._mouse_tracking = False @@ -796,12 +797,14 @@ class WndSwissMx(QMainWindow, Ui_MainWindow): self.track_objects() def cb_move_backlight_safe(self, pos): + app=QApplication.instance() + bl=app._backlight # any move of backlight requires post sample tube out try: self.assert_post_tube_position(pos="out") except: self.move_post_tube("out") - backlight.move(pos) + bl.move(pos) def get_tweaker(self, rec, alias=None, label=None, mtype="epics_motor", layout=None, **kwargs): app = QApplication.instance() @@ -1464,46 +1467,47 @@ class WndSwissMx(QMainWindow, Ui_MainWindow): self.daq_embl_collect_points() def cb_esc_sample_exchange(self): - self._escape_current_state = "busy" + app=QApplication.instance() + self._esc_state ="busy" steps = [] - if option(CRYOJET_MOTION_ENABLED): - steps.append(lambda: self.move_cryojet_nozzle("out")) + #if option(CRYOJET_MOTION_ENABLED): + # steps.append(lambda: self.move_cryojet_nozzle("out")) steps.extend( [ lambda: self.move_post_tube("out"), - lambda: backlight.move("out", wait=True), + lambda: app._backlight.move("out", wait=True), lambda: self.move_collimator("out"), ] ) - self.escape_run_steps(steps, "Transitioning to Sample Exchange") - self._escape_current_state = "ManualSampleExchange" + self.esc_run_steps(steps, "Transitioning to Sample Exchange") + self._esc_state ="ManualSampleExchange" def cb_esc_sample_alignment(self): app=QApplication.instance() - self._escape_current_state = "busy" - + self._esc_state ="busy" steps = [ # lambda: sample_selection.tell.set_current(30.0), lambda: self.move_collimator("ready") ] - if option(CRYOJET_MOTION_ENABLED): - steps.extend([lambda: self.move_cryojet_nozzle("in")]) + #if option(CRYOJET_MOTION_ENABLED): + # steps.extend([lambda: self.move_cryojet_nozzle("in")]) steps.extend([lambda: self.move_post_tube("out"), lambda: app._backlight.move("in")]) - self.escape_run_steps(steps, "Transitioning to Sample Alignment") - self._escape_current_state = "SampleAlignment" + self.esc_run_steps(steps, "Transitioning to Sample Alignment") + self._esc_state ="SampleAlignment" def cb_esc_data_collection(self): - self._escape_current_state = "busy" + app=QApplication.instance() + self._esc_state ="busy" steps = [ # lambda: sample_selection.tell.set_current(30.0), - lambda: backlight.move("out", assert_positions=True), + lambda: app._backlight.move("out"), lambda: self.move_post_tube("in"), lambda: self.move_collimator("in"), ] - if option(CRYOJET_MOTION_ENABLED): - steps.extend([lambda: self.move_cryojet_nozzle("in")]) - self.escape_run_steps(steps, "Transitioning to Data Collection") - self._escape_current_state = "DataCollection" + #if option(CRYOJET_MOTION_ENABLED): + # steps.extend([lambda: self.move_cryojet_nozzle("in")]) + self.esc_run_steps(steps, "Transitioning to Data Collection") + self._esc_state ="DataCollection" def cb_really_quit(self): """called when user Ctrl-Q the app""" @@ -1515,6 +1519,37 @@ class WndSwissMx(QMainWindow, Ui_MainWindow): _log.warning("homing fast stages") epics.PV("SAR-EXPMX1:ASYN.AOUT").put(b"enable plc 1") + def cb_testcode(self): + try: + tc=self._testCode + tc['idx']+=1 + except AttributeError: + self._testCode=tc={'idx':0} + step=tc['idx'] + vb=self.vb + + if step==0: + grp=pg.ItemGroup() + vb.addItem(grp) + obj=UsrGO.Marker((100, 100), (100, 100), mode=1) + grp.addItem(obj) + obj=UsrGO.Marker((150, 100), (50, 50), mode=1) + grp.addItem(obj) + obj=UsrGO.Marker((200, 100), (100, 100), mode=1) + grp.addItem(obj) + tc['grp']=grp + elif step==1: + grp=tc['grp'] + tr=grp.transform() + # UsrGO.obj_info(tr) + tr.setMatrix(1, .2, 0, + -.2, 1, 0, + 0, 0, 1) + grp.setTransform(tr) + + print(vb.childGroup.childItems()) + pass + def prepare_left_tabs(self): tabs = self._left_tabs tabs.currentChanged.connect(self.cb_switch_task) @@ -1947,36 +1982,142 @@ class WndSwissMx(QMainWindow, Ui_MainWindow): #plt.show(block=True) return - def cb_testcode(self): - try: - tc=self._testCode - tc['idx']+=1 - except AttributeError: - self._testCode=tc={'idx':0} - step=tc['idx'] - vb=self.vb + def esc_run_steps(self, steps, title): + with pg.ProgressDialog(title, 0, len(steps)) as dlg: + for step in steps: + step() + dlg += 1 + if dlg.wasCanceled(): + QMessageBox.warning(self, "escape steps", "ABORTED" + title) + break - if step==0: - grp=pg.ItemGroup() - vb.addItem(grp) - obj=UsrGO.Marker((100,100), (100,100),mode=1) - grp.addItem(obj) - obj=UsrGO.Marker((150, 100), (50, 50), mode=1) - grp.addItem(obj) - obj=UsrGO.Marker((200, 100), (100, 100), mode=1) - grp.addItem(obj) - tc['grp']=grp - elif step==1: - grp=tc['grp'] - tr=grp.transform() - #UsrGO.obj_info(tr) - tr.setMatrix(1, .2, 0, - -.2, 1, 0, - 0, 0, 1) - grp.setTransform(tr) + def move_post_tube(self, dir): + app=QApplication.instance() + cfg=app._cfg + x_up = cfg.value(AppCfg.PST_X_UP , np.NaN,type=float) + y_up = cfg.value(AppCfg.PST_Y_UP , np.NaN,type=float) + x_down = cfg.value(AppCfg.PST_X_DOWN, np.NaN,type=float) + y_down = cfg.value(AppCfg.PST_Y_DOWN, np.NaN,type=float) + dx = cfg.value(AppCfg.PST_DX , np.NaN,type=float) + dy = cfg.value(AppCfg.PST_DY , np.NaN,type=float) + tz_in = cfg.value(AppCfg.PST_TZ_IN , np.NaN,type=float) + tz_out = cfg.value(AppCfg.PST_TZ_OUT, np.NaN,type=float) + if np.isnan(x_up): + msg = "SwissMX *POST-SAMPLE-TUBE* configuration is incomplete!!!" + _log.warning(msg) + QMessageBox.warning(self, "post tube not configured", msg) + return + + usy = self.tweakers["tube_usy"] + dsy = self.tweakers["tube_dsy"] + + usx = self.tweakers["tube_usx"] + dsx = self.tweakers["tube_dsx"] + tube_z = self.tweakers["tube_z"] + + tandem_twv = float(self._post_tandem_tweak_val.text()) + + if dir == "in": + _log.info("move post sample tube in") + usy.move_abs(y_up) + dsy.move_abs(y_down) + usx.move_abs(x_up) + dsx.move_abs(x_down) + try: + app_utils.assert_tweaker_positions([ + (usy, y_up, 0.1), + (dsy, y_down, 0.1), + (usx, x_up, 0.1), + (dsx, x_down, 0.1), ],timeout=10.0, + ) + except app_utils.PositionsNotReached as e: + _log.warning("failed to move post sample tube {}".format(dir)) + _log.warning(e) + QMessageBox.warning(self, "failed to move post sample tube XY {in}", "failed to move post sample tube XY {in}",) + raise + tube_z.move_abs(tz_in, wait=True) + try: + app_utils.assert_tweaker_positions([(tube_z, tz_in, 0.1)]) + except app_utils.PositionsNotReached as e: + _log.warning("failed to move post sample tube Z {in}") + _log.warning(e) + QMessageBox.warning(self, "failed to move post sample tube Z {in}", "failed to move post sample tube Z {in}",) + raise + + elif dir == "out": + _log.info("move post sample tube out") + tube_z.move_abs(tz_out, wait=True) + try: + app_utils.assert_tweaker_positions([(tube_z, tz_out, 0.1)]) + except app_utils.PositionsNotReached as e: + _log.warning("failed to move post sample tube {out}") + _log.warning(e) + QMessageBox.warning(self,"failed to move post sample tube Z {out}","failed to move post sample tube Z {out}",) + raise + usy.move_abs(y_up+dy) + dsy.move_abs(y_down+dy) + usx.move_abs(x_up+dx) + dsx.move_abs(x_down+dx) + try: + app_utils.assert_tweaker_positions([ + (usy, y_up + dy, 0.1), + (dsy, y_down + dy, 0.1), + (usx, x_up + dx, 0.1), + (dsx, x_down + dx, 0.1), ], timeout=10.0, + ) + except app_utils.PositionsNotReached as e: + _log.warning("failed to move post sample tube {}".format(dir)) + _log.warning(e) + QMessageBox.warning(self,"failed to move post sample tube XY {out}","failed to move post sample tube XY {out}",) + raise + elif dir == "x-pos": + _log.info("tamdem move post sample tube X-pos by {} mm".format(tandem_twv)) + usx.move_rel(tandem_twv) + dsx.move_rel(tandem_twv) + elif dir == "x-neg": + _log.info("tamdem move post sample tube X-neg {} mm".format(tandem_twv)) + usx.move_rel(-tandem_twv) + dsx.move_rel(-tandem_twv) + elif dir == "up": + _log.info("tamdem move post sample tube UP {} mm".format(tandem_twv)) + usy.move_rel(tandem_twv) + dsy.move_rel(tandem_twv) + elif dir == "down": + _log.info("tamdem move post sample tube DOWN {} mm".format(tandem_twv)) + usy.move_rel(-tandem_twv) + dsy.move_rel(-tandem_twv) + + def move_collimator(self, pos): + app=QApplication.instance() + cfg=app._cfg + cx = self.tweakers["colli_x"] + cy = self.tweakers["colli_y"] + + x_pos = cfg.value(AppCfg.COL_X_IN, None, type=float) + y_pos = cfg.value(AppCfg.COL_Y_IN, np.NaN, type=float) + dx = cfg.value(AppCfg.COL_DX, np.NaN, type=float) + dy = cfg.value(AppCfg.COL_DY, np.NaN, type=float) + + if np.isnan(x_pos + y_pos + dx + dy): + msg="COLLIMATOR configuration is incomplete!" + _log.warning(msg) + QMessageBox.warning(self, "post tube not configured", msg) + return + + _log.info("moving collimator {} to X,Y = {:.3f}, {:.3f}".format(pos, x_pos, y_pos)) + + if pos == "out": + cy.move_abs(y_pos+dy, assert_position=True) + cx.move_abs(x_pos+dx, assert_position=True) + elif pos == "in": + cx.move_abs(x_pos, assert_position=True) + cy.move_abs(y_pos, assert_position=True) + elif pos == "ready": + cx.move_abs(x_pos, assert_position=True) + cy.move_abs(y_pos+dy, assert_position=True) + else: + raise ValueError("Collimator position *{}* is not known!!") - print(vb.childGroup.childItems()) - pass # **************** OBSOLETE AND/OR OLD STUFF **************** @@ -2207,8 +2348,8 @@ class WndSwissMx(QMainWindow, Ui_MainWindow): if msg is None: return if "current" in msg: - _log.warning(f"current state: {self._escape_current_state}") - zescape.reply(self._escape_current_state) + _log.warning(f"current state: {self._esc_state}") + zescape.reply(self._esc_state) elif "goto" in msg: state = msg.split()[1].lower() _log.warning(f"TELL requests to go to {state}") @@ -2222,7 +2363,7 @@ class WndSwissMx(QMainWindow, Ui_MainWindow): self.cb_esc_sample_alignment() except: zescape.reply("Maintenance") - zescape.reply(self._escape_current_state) + zescape.reply(self._esc_state) else: # JSON data = json.loads(msg) if "sampleName" in data: @@ -2432,14 +2573,6 @@ class WndSwissMx(QMainWindow, Ui_MainWindow): bx, by = 500, 500 return (bx, by) - def _OLD_escape_run_steps(self, steps, title): - with pg.ProgressDialog(title, 0, len(steps)) as dlg: - for step in steps: - step() - dlg += 1 - if dlg.wasCanceled(): - raise TransitionAborted("ABORTED" + title) - def _OLD_move_gonio_to_position(self, fx, fy, bx, bz, omega): self.tweakers["fast_x"].motor.move(fx, wait=False, ignore_limits=True) self.tweakers["fast_y"].motor.move(fy, wait=False, ignore_limits=True) @@ -2591,35 +2724,6 @@ class WndSwissMx(QMainWindow, Ui_MainWindow): cz.move(t_cz, wait=True, ignore_limits=True) omega.move(t_omega, wait=True, ignore_limits=True) - def _OLD_move_collimator(self, pos): - cx = self.tweakers["colli_x"] - cy = self.tweakers["colli_y"] - - x_pos = settings.value("collimator/x_in", 1e10, type=float) - y_pos = settings.value("collimator/y_in", 1e10, type=float) - dx = settings.value("collimator/dx", 1e10, type=float) - dy = settings.value("collimator/dy", 1e10, type=float) - - _log.info('Skip Zac''s orig. code') - return - # ZAC: orig. code - if 1e9 < x_pos + y_pos + dx + dy: - raise IncompleteConfiguration("COLLIMATOR configuration is incomplete!") - - _log.info("moving collimator {} to X,Y = {:.3f}, {:.3f}".format(pos, x_pos, y_pos)) - - if pos == "out": - cy.move_abs(y_pos+dy, assert_position=True) - cx.move_abs(x_pos+dx, assert_position=True) - elif pos == "in": - cx.move_abs(x_pos, assert_position=True) - cy.move_abs(y_pos, assert_position=True) - elif pos == "ready": - cx.move_abs(x_pos, assert_position=True) - cy.move_abs(y_pos+dy, assert_position=True) - else: - raise UnknownLabeledPosition("Collimator position *{}* is not known!!") - def _OLD_move_cryojet_nozzle(self, pos): cx = self.tweakers["cryo"] if "in" == pos.lower(): @@ -2707,102 +2811,6 @@ class WndSwissMx(QMainWindow, Ui_MainWindow): (tbz, z, 0.1),],timeout=2.0, ) - def _OLD_move_post_tube(self, dir): - app=QApplication.instance() - cfg=app._cfg - x_up = cfg.value(AppCfg.PST_X_UP , type=float) - y_up = cfg.value(AppCfg.PST_Y_UP , type=float) - x_down = cfg.value(AppCfg.PST_X_DOWN, type=float) - y_down = cfg.value(AppCfg.PST_Y_DOWN, type=float) - dx = cfg.value(AppCfg.PST_DX , type=float) - dy = cfg.value(AppCfg.PST_DY , type=float) - tz_in = cfg.value(AppCfg.PST_TZ_IN , type=float) - tz_out = cfg.value(AppCfg.PST_TZ_OUT, type=float) - if x_up is None: - msg = "SwissMX *POST-SAMPLE-TUBE* configuration is incomplete!!!" - _log.warning(msg) - QMessageBox.warning(self, "post tube not configured", msg) - return - - usy = self.tweakers["tube_usy"] - dsy = self.tweakers["tube_dsy"] - - usx = self.tweakers["tube_usx"] - dsx = self.tweakers["tube_dsx"] - tube_z = self.tweakers["tube_z"] - - tandem_twv = float(self._post_tandem_tweak_val.text()) - - if dir == "in": - _log.info("move post sample tube in") - usy.move_abs(y_up) - dsy.move_abs(y_down) - usx.move_abs(x_up) - dsx.move_abs(x_down) - try: - app_utils.assert_tweaker_positions([ - (usy, y_up, 0.1), - (dsy, y_down, 0.1), - (usx, x_up, 0.1), - (dsx, x_down, 0.1), ],timeout=10.0, - ) - except app_utils.PositionsNotReached as e: - _log.warning("failed to move post sample tube {}".format(dir)) - _log.warning(e) - QMessageBox.warning(self, "failed to move post sample tube XY {in}", "failed to move post sample tube XY {in}",) - raise - tube_z.move_abs(tz_in, wait=True) - try: - app_utils.assert_tweaker_positions([(tube_z, tz_in, 0.1)]) - except app_utils.PositionsNotReached as e: - _log.warning("failed to move post sample tube Z {in}") - _log.warning(e) - QMessageBox.warning(self, "failed to move post sample tube Z {in}", "failed to move post sample tube Z {in}",) - raise - - elif dir == "out": - _log.info("move post sample tube out") - tube_z.move_abs(tz_out, wait=True) - try: - app_utils.assert_tweaker_positions([(tube_z, tz_out, 0.1)]) - except app_utils.PositionsNotReached as e: - _log.warning("failed to move post sample tube {out}") - _log.warning(e) - QMessageBox.warning(self,"failed to move post sample tube Z {out}","failed to move post sample tube Z {out}",) - raise - usy.move_abs(y_up+dy) - dsy.move_abs(y_down+dy) - usx.move_abs(x_up+dx) - dsx.move_abs(x_down+dx) - try: - app_utils.assert_tweaker_positions([ - (usy, y_up + dy, 0.1), - (dsy, y_down + dy, 0.1), - (usx, x_up + dx, 0.1), - (dsx, x_down + dx, 0.1), ], timeout=10.0, - ) - except app_utils.PositionsNotReached as e: - _log.warning("failed to move post sample tube {}".format(dir)) - _log.warning(e) - QMessageBox.warning(self,"failed to move post sample tube XY {out}","failed to move post sample tube XY {out}",) - raise - elif dir == "x-pos": - _log.info("tamdem move post sample tube X-pos by {} mm".format(tandem_twv)) - usx.move_rel(tandem_twv) - dsx.move_rel(tandem_twv) - elif dir == "x-neg": - _log.info("tamdem move post sample tube X-neg {} mm".format(tandem_twv)) - usx.move_rel(-tandem_twv) - dsx.move_rel(-tandem_twv) - elif dir == "up": - _log.info("tamdem move post sample tube UP {} mm".format(tandem_twv)) - usy.move_rel(tandem_twv) - dsy.move_rel(tandem_twv) - elif dir == "down": - _log.info("tamdem move post sample tube DOWN {} mm".format(tandem_twv)) - usy.move_rel(-tandem_twv) - dsy.move_rel(-tandem_twv) - def _OLD_add_tweaker(self, pv, alias=None, label=None, mtype="epics_motor", layout=None): if layout is None: layout = self._tweak_container.layout()