add escape steps, collomator and post tube motion

This commit is contained in:
2022-09-02 18:40:14 +02:00
parent 6e3e1cd8ab
commit c11e0acfcd

View File

@@ -256,6 +256,7 @@ class WndSwissMx(QMainWindow, Ui_MainWindow):
QtGui.QFontDatabase.addApplicationFont("fonts/Inconsolata-Bold.ttf")
QtGui.QFontDatabase.addApplicationFont("fonts/Baloo-Regular.ttf")
# TODO: Cleanup many member functions that are unused or obsolete
self._pv_shutter = None # epics.PV('X06SA-ES-MD2:SHUTTER')
self._has_pulse_picker = False
self._at_x06sa = False
@@ -270,7 +271,7 @@ class WndSwissMx(QMainWindow, Ui_MainWindow):
self.init_graphics()
self._escape_current_state = "Maintenance"
self._esc_state ="Maintenance"
self._pin_mounting_offset = 0.0
self._mouse_tracking = False
@@ -796,12 +797,14 @@ class WndSwissMx(QMainWindow, Ui_MainWindow):
self.track_objects()
def cb_move_backlight_safe(self, pos):
app=QApplication.instance()
bl=app._backlight
# any move of backlight requires post sample tube out
try:
self.assert_post_tube_position(pos="out")
except:
self.move_post_tube("out")
backlight.move(pos)
bl.move(pos)
def get_tweaker(self, rec, alias=None, label=None, mtype="epics_motor", layout=None, **kwargs):
app = QApplication.instance()
@@ -1464,46 +1467,47 @@ class WndSwissMx(QMainWindow, Ui_MainWindow):
self.daq_embl_collect_points()
def cb_esc_sample_exchange(self):
self._escape_current_state = "busy"
app=QApplication.instance()
self._esc_state ="busy"
steps = []
if option(CRYOJET_MOTION_ENABLED):
steps.append(lambda: self.move_cryojet_nozzle("out"))
#if option(CRYOJET_MOTION_ENABLED):
# steps.append(lambda: self.move_cryojet_nozzle("out"))
steps.extend(
[
lambda: self.move_post_tube("out"),
lambda: backlight.move("out", wait=True),
lambda: app._backlight.move("out", wait=True),
lambda: self.move_collimator("out"),
]
)
self.escape_run_steps(steps, "Transitioning to Sample Exchange")
self._escape_current_state = "ManualSampleExchange"
self.esc_run_steps(steps, "Transitioning to Sample Exchange")
self._esc_state ="ManualSampleExchange"
def cb_esc_sample_alignment(self):
app=QApplication.instance()
self._escape_current_state = "busy"
self._esc_state ="busy"
steps = [
# lambda: sample_selection.tell.set_current(30.0),
lambda: self.move_collimator("ready")
]
if option(CRYOJET_MOTION_ENABLED):
steps.extend([lambda: self.move_cryojet_nozzle("in")])
#if option(CRYOJET_MOTION_ENABLED):
# steps.extend([lambda: self.move_cryojet_nozzle("in")])
steps.extend([lambda: self.move_post_tube("out"), lambda: app._backlight.move("in")])
self.escape_run_steps(steps, "Transitioning to Sample Alignment")
self._escape_current_state = "SampleAlignment"
self.esc_run_steps(steps, "Transitioning to Sample Alignment")
self._esc_state ="SampleAlignment"
def cb_esc_data_collection(self):
self._escape_current_state = "busy"
app=QApplication.instance()
self._esc_state ="busy"
steps = [
# lambda: sample_selection.tell.set_current(30.0),
lambda: backlight.move("out", assert_positions=True),
lambda: app._backlight.move("out"),
lambda: self.move_post_tube("in"),
lambda: self.move_collimator("in"),
]
if option(CRYOJET_MOTION_ENABLED):
steps.extend([lambda: self.move_cryojet_nozzle("in")])
self.escape_run_steps(steps, "Transitioning to Data Collection")
self._escape_current_state = "DataCollection"
#if option(CRYOJET_MOTION_ENABLED):
# steps.extend([lambda: self.move_cryojet_nozzle("in")])
self.esc_run_steps(steps, "Transitioning to Data Collection")
self._esc_state ="DataCollection"
def cb_really_quit(self):
"""called when user Ctrl-Q the app"""
@@ -1515,6 +1519,37 @@ class WndSwissMx(QMainWindow, Ui_MainWindow):
_log.warning("homing fast stages")
epics.PV("SAR-EXPMX1:ASYN.AOUT").put(b"enable plc 1")
def cb_testcode(self):
try:
tc=self._testCode
tc['idx']+=1
except AttributeError:
self._testCode=tc={'idx':0}
step=tc['idx']
vb=self.vb
if step==0:
grp=pg.ItemGroup()
vb.addItem(grp)
obj=UsrGO.Marker((100, 100), (100, 100), mode=1)
grp.addItem(obj)
obj=UsrGO.Marker((150, 100), (50, 50), mode=1)
grp.addItem(obj)
obj=UsrGO.Marker((200, 100), (100, 100), mode=1)
grp.addItem(obj)
tc['grp']=grp
elif step==1:
grp=tc['grp']
tr=grp.transform()
# UsrGO.obj_info(tr)
tr.setMatrix(1, .2, 0,
-.2, 1, 0,
0, 0, 1)
grp.setTransform(tr)
print(vb.childGroup.childItems())
pass
def prepare_left_tabs(self):
tabs = self._left_tabs
tabs.currentChanged.connect(self.cb_switch_task)
@@ -1947,36 +1982,142 @@ class WndSwissMx(QMainWindow, Ui_MainWindow):
#plt.show(block=True)
return
def cb_testcode(self):
try:
tc=self._testCode
tc['idx']+=1
except AttributeError:
self._testCode=tc={'idx':0}
step=tc['idx']
vb=self.vb
def esc_run_steps(self, steps, title):
with pg.ProgressDialog(title, 0, len(steps)) as dlg:
for step in steps:
step()
dlg += 1
if dlg.wasCanceled():
QMessageBox.warning(self, "escape steps", "ABORTED" + title)
break
if step==0:
grp=pg.ItemGroup()
vb.addItem(grp)
obj=UsrGO.Marker((100,100), (100,100),mode=1)
grp.addItem(obj)
obj=UsrGO.Marker((150, 100), (50, 50), mode=1)
grp.addItem(obj)
obj=UsrGO.Marker((200, 100), (100, 100), mode=1)
grp.addItem(obj)
tc['grp']=grp
elif step==1:
grp=tc['grp']
tr=grp.transform()
#UsrGO.obj_info(tr)
tr.setMatrix(1, .2, 0,
-.2, 1, 0,
0, 0, 1)
grp.setTransform(tr)
def move_post_tube(self, dir):
app=QApplication.instance()
cfg=app._cfg
x_up = cfg.value(AppCfg.PST_X_UP , np.NaN,type=float)
y_up = cfg.value(AppCfg.PST_Y_UP , np.NaN,type=float)
x_down = cfg.value(AppCfg.PST_X_DOWN, np.NaN,type=float)
y_down = cfg.value(AppCfg.PST_Y_DOWN, np.NaN,type=float)
dx = cfg.value(AppCfg.PST_DX , np.NaN,type=float)
dy = cfg.value(AppCfg.PST_DY , np.NaN,type=float)
tz_in = cfg.value(AppCfg.PST_TZ_IN , np.NaN,type=float)
tz_out = cfg.value(AppCfg.PST_TZ_OUT, np.NaN,type=float)
if np.isnan(x_up):
msg = "SwissMX *POST-SAMPLE-TUBE* configuration is incomplete!!!"
_log.warning(msg)
QMessageBox.warning(self, "post tube not configured", msg)
return
usy = self.tweakers["tube_usy"]
dsy = self.tweakers["tube_dsy"]
usx = self.tweakers["tube_usx"]
dsx = self.tweakers["tube_dsx"]
tube_z = self.tweakers["tube_z"]
tandem_twv = float(self._post_tandem_tweak_val.text())
if dir == "in":
_log.info("move post sample tube in")
usy.move_abs(y_up)
dsy.move_abs(y_down)
usx.move_abs(x_up)
dsx.move_abs(x_down)
try:
app_utils.assert_tweaker_positions([
(usy, y_up, 0.1),
(dsy, y_down, 0.1),
(usx, x_up, 0.1),
(dsx, x_down, 0.1), ],timeout=10.0,
)
except app_utils.PositionsNotReached as e:
_log.warning("failed to move post sample tube {}".format(dir))
_log.warning(e)
QMessageBox.warning(self, "failed to move post sample tube XY {in}", "failed to move post sample tube XY {in}",)
raise
tube_z.move_abs(tz_in, wait=True)
try:
app_utils.assert_tweaker_positions([(tube_z, tz_in, 0.1)])
except app_utils.PositionsNotReached as e:
_log.warning("failed to move post sample tube Z {in}")
_log.warning(e)
QMessageBox.warning(self, "failed to move post sample tube Z {in}", "failed to move post sample tube Z {in}",)
raise
elif dir == "out":
_log.info("move post sample tube out")
tube_z.move_abs(tz_out, wait=True)
try:
app_utils.assert_tweaker_positions([(tube_z, tz_out, 0.1)])
except app_utils.PositionsNotReached as e:
_log.warning("failed to move post sample tube {out}")
_log.warning(e)
QMessageBox.warning(self,"failed to move post sample tube Z {out}","failed to move post sample tube Z {out}",)
raise
usy.move_abs(y_up+dy)
dsy.move_abs(y_down+dy)
usx.move_abs(x_up+dx)
dsx.move_abs(x_down+dx)
try:
app_utils.assert_tweaker_positions([
(usy, y_up + dy, 0.1),
(dsy, y_down + dy, 0.1),
(usx, x_up + dx, 0.1),
(dsx, x_down + dx, 0.1), ], timeout=10.0,
)
except app_utils.PositionsNotReached as e:
_log.warning("failed to move post sample tube {}".format(dir))
_log.warning(e)
QMessageBox.warning(self,"failed to move post sample tube XY {out}","failed to move post sample tube XY {out}",)
raise
elif dir == "x-pos":
_log.info("tamdem move post sample tube X-pos by {} mm".format(tandem_twv))
usx.move_rel(tandem_twv)
dsx.move_rel(tandem_twv)
elif dir == "x-neg":
_log.info("tamdem move post sample tube X-neg {} mm".format(tandem_twv))
usx.move_rel(-tandem_twv)
dsx.move_rel(-tandem_twv)
elif dir == "up":
_log.info("tamdem move post sample tube UP {} mm".format(tandem_twv))
usy.move_rel(tandem_twv)
dsy.move_rel(tandem_twv)
elif dir == "down":
_log.info("tamdem move post sample tube DOWN {} mm".format(tandem_twv))
usy.move_rel(-tandem_twv)
dsy.move_rel(-tandem_twv)
def move_collimator(self, pos):
app=QApplication.instance()
cfg=app._cfg
cx = self.tweakers["colli_x"]
cy = self.tweakers["colli_y"]
x_pos = cfg.value(AppCfg.COL_X_IN, None, type=float)
y_pos = cfg.value(AppCfg.COL_Y_IN, np.NaN, type=float)
dx = cfg.value(AppCfg.COL_DX, np.NaN, type=float)
dy = cfg.value(AppCfg.COL_DY, np.NaN, type=float)
if np.isnan(x_pos + y_pos + dx + dy):
msg="COLLIMATOR configuration is incomplete!"
_log.warning(msg)
QMessageBox.warning(self, "post tube not configured", msg)
return
_log.info("moving collimator {} to X,Y = {:.3f}, {:.3f}".format(pos, x_pos, y_pos))
if pos == "out":
cy.move_abs(y_pos+dy, assert_position=True)
cx.move_abs(x_pos+dx, assert_position=True)
elif pos == "in":
cx.move_abs(x_pos, assert_position=True)
cy.move_abs(y_pos, assert_position=True)
elif pos == "ready":
cx.move_abs(x_pos, assert_position=True)
cy.move_abs(y_pos+dy, assert_position=True)
else:
raise ValueError("Collimator position *{}* is not known!!")
print(vb.childGroup.childItems())
pass
# **************** OBSOLETE AND/OR OLD STUFF ****************
@@ -2207,8 +2348,8 @@ class WndSwissMx(QMainWindow, Ui_MainWindow):
if msg is None:
return
if "current" in msg:
_log.warning(f"current state: {self._escape_current_state}")
zescape.reply(self._escape_current_state)
_log.warning(f"current state: {self._esc_state}")
zescape.reply(self._esc_state)
elif "goto" in msg:
state = msg.split()[1].lower()
_log.warning(f"TELL requests to go to {state}")
@@ -2222,7 +2363,7 @@ class WndSwissMx(QMainWindow, Ui_MainWindow):
self.cb_esc_sample_alignment()
except:
zescape.reply("Maintenance")
zescape.reply(self._escape_current_state)
zescape.reply(self._esc_state)
else: # JSON
data = json.loads(msg)
if "sampleName" in data:
@@ -2432,14 +2573,6 @@ class WndSwissMx(QMainWindow, Ui_MainWindow):
bx, by = 500, 500
return (bx, by)
def _OLD_escape_run_steps(self, steps, title):
with pg.ProgressDialog(title, 0, len(steps)) as dlg:
for step in steps:
step()
dlg += 1
if dlg.wasCanceled():
raise TransitionAborted("ABORTED" + title)
def _OLD_move_gonio_to_position(self, fx, fy, bx, bz, omega):
self.tweakers["fast_x"].motor.move(fx, wait=False, ignore_limits=True)
self.tweakers["fast_y"].motor.move(fy, wait=False, ignore_limits=True)
@@ -2591,35 +2724,6 @@ class WndSwissMx(QMainWindow, Ui_MainWindow):
cz.move(t_cz, wait=True, ignore_limits=True)
omega.move(t_omega, wait=True, ignore_limits=True)
def _OLD_move_collimator(self, pos):
cx = self.tweakers["colli_x"]
cy = self.tweakers["colli_y"]
x_pos = settings.value("collimator/x_in", 1e10, type=float)
y_pos = settings.value("collimator/y_in", 1e10, type=float)
dx = settings.value("collimator/dx", 1e10, type=float)
dy = settings.value("collimator/dy", 1e10, type=float)
_log.info('Skip Zac''s orig. code')
return
# ZAC: orig. code
if 1e9 < x_pos + y_pos + dx + dy:
raise IncompleteConfiguration("COLLIMATOR configuration is incomplete!")
_log.info("moving collimator {} to X,Y = {:.3f}, {:.3f}".format(pos, x_pos, y_pos))
if pos == "out":
cy.move_abs(y_pos+dy, assert_position=True)
cx.move_abs(x_pos+dx, assert_position=True)
elif pos == "in":
cx.move_abs(x_pos, assert_position=True)
cy.move_abs(y_pos, assert_position=True)
elif pos == "ready":
cx.move_abs(x_pos, assert_position=True)
cy.move_abs(y_pos+dy, assert_position=True)
else:
raise UnknownLabeledPosition("Collimator position *{}* is not known!!")
def _OLD_move_cryojet_nozzle(self, pos):
cx = self.tweakers["cryo"]
if "in" == pos.lower():
@@ -2707,102 +2811,6 @@ class WndSwissMx(QMainWindow, Ui_MainWindow):
(tbz, z, 0.1),],timeout=2.0,
)
def _OLD_move_post_tube(self, dir):
app=QApplication.instance()
cfg=app._cfg
x_up = cfg.value(AppCfg.PST_X_UP , type=float)
y_up = cfg.value(AppCfg.PST_Y_UP , type=float)
x_down = cfg.value(AppCfg.PST_X_DOWN, type=float)
y_down = cfg.value(AppCfg.PST_Y_DOWN, type=float)
dx = cfg.value(AppCfg.PST_DX , type=float)
dy = cfg.value(AppCfg.PST_DY , type=float)
tz_in = cfg.value(AppCfg.PST_TZ_IN , type=float)
tz_out = cfg.value(AppCfg.PST_TZ_OUT, type=float)
if x_up is None:
msg = "SwissMX *POST-SAMPLE-TUBE* configuration is incomplete!!!"
_log.warning(msg)
QMessageBox.warning(self, "post tube not configured", msg)
return
usy = self.tweakers["tube_usy"]
dsy = self.tweakers["tube_dsy"]
usx = self.tweakers["tube_usx"]
dsx = self.tweakers["tube_dsx"]
tube_z = self.tweakers["tube_z"]
tandem_twv = float(self._post_tandem_tweak_val.text())
if dir == "in":
_log.info("move post sample tube in")
usy.move_abs(y_up)
dsy.move_abs(y_down)
usx.move_abs(x_up)
dsx.move_abs(x_down)
try:
app_utils.assert_tweaker_positions([
(usy, y_up, 0.1),
(dsy, y_down, 0.1),
(usx, x_up, 0.1),
(dsx, x_down, 0.1), ],timeout=10.0,
)
except app_utils.PositionsNotReached as e:
_log.warning("failed to move post sample tube {}".format(dir))
_log.warning(e)
QMessageBox.warning(self, "failed to move post sample tube XY {in}", "failed to move post sample tube XY {in}",)
raise
tube_z.move_abs(tz_in, wait=True)
try:
app_utils.assert_tweaker_positions([(tube_z, tz_in, 0.1)])
except app_utils.PositionsNotReached as e:
_log.warning("failed to move post sample tube Z {in}")
_log.warning(e)
QMessageBox.warning(self, "failed to move post sample tube Z {in}", "failed to move post sample tube Z {in}",)
raise
elif dir == "out":
_log.info("move post sample tube out")
tube_z.move_abs(tz_out, wait=True)
try:
app_utils.assert_tweaker_positions([(tube_z, tz_out, 0.1)])
except app_utils.PositionsNotReached as e:
_log.warning("failed to move post sample tube {out}")
_log.warning(e)
QMessageBox.warning(self,"failed to move post sample tube Z {out}","failed to move post sample tube Z {out}",)
raise
usy.move_abs(y_up+dy)
dsy.move_abs(y_down+dy)
usx.move_abs(x_up+dx)
dsx.move_abs(x_down+dx)
try:
app_utils.assert_tweaker_positions([
(usy, y_up + dy, 0.1),
(dsy, y_down + dy, 0.1),
(usx, x_up + dx, 0.1),
(dsx, x_down + dx, 0.1), ], timeout=10.0,
)
except app_utils.PositionsNotReached as e:
_log.warning("failed to move post sample tube {}".format(dir))
_log.warning(e)
QMessageBox.warning(self,"failed to move post sample tube XY {out}","failed to move post sample tube XY {out}",)
raise
elif dir == "x-pos":
_log.info("tamdem move post sample tube X-pos by {} mm".format(tandem_twv))
usx.move_rel(tandem_twv)
dsx.move_rel(tandem_twv)
elif dir == "x-neg":
_log.info("tamdem move post sample tube X-neg {} mm".format(tandem_twv))
usx.move_rel(-tandem_twv)
dsx.move_rel(-tandem_twv)
elif dir == "up":
_log.info("tamdem move post sample tube UP {} mm".format(tandem_twv))
usy.move_rel(tandem_twv)
dsy.move_rel(tandem_twv)
elif dir == "down":
_log.info("tamdem move post sample tube DOWN {} mm".format(tandem_twv))
usy.move_rel(-tandem_twv)
dsy.move_rel(-tandem_twv)
def _OLD_add_tweaker(self, pv, alias=None, label=None, mtype="epics_motor", layout=None):
if layout is None:
layout = self._tweak_container.layout()