314 lines
11 KiB
Python
Executable File
314 lines
11 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# *-----------------------------------------------------------------------*
|
|
# | |
|
|
# | Copyright (c) 2014 by Paul Scherrer Institute (http://www.psi.ch) |
|
|
# | |
|
|
# | Author Thierry Zamofing (thierry.zamofing@psi.ch) |
|
|
# *-----------------------------------------------------------------------*
|
|
"""
|
|
motion procedures for PSSS:
|
|
|
|
mode:
|
|
0: homing
|
|
1: calc energy and SPECTRUM_X out of camera arm angle
|
|
move CamPosX out of CristBendRot
|
|
"""
|
|
|
|
from __future__ import print_function
|
|
from errno import ENETRESET
|
|
import logging, sys, os, json
|
|
import CaChannel, time
|
|
import numpy as np
|
|
|
|
|
|
class PSSS:
|
|
def __init__(self, args):
|
|
# import pdb; pdb.set_trace()
|
|
# sys.exit()
|
|
# {'mode': 0, 'stdout': True, 'var': []}
|
|
# {'mode': 5, 'stdout': True, 'var': ['SARFE10-PSSS059']}
|
|
|
|
self.args = args
|
|
prefix = self.args.var[0]
|
|
prefix = prefix[0 : prefix.find("-") + 1]
|
|
self.prefix = prefix
|
|
# print('Prefix='+prefix)
|
|
self.pv = dict()
|
|
|
|
def getPv(self, name):
|
|
try:
|
|
pv = self.pv[name]
|
|
except KeyError:
|
|
prefix = self.prefix
|
|
pv = CaChannel.CaChannel(prefix + name)
|
|
pv.searchw()
|
|
self.pv[name] = pv
|
|
return pv
|
|
|
|
def moveLimit(self, m, val):
|
|
self.moveAbs(m, val)
|
|
|
|
def waitMotionDone(self, m):
|
|
s = m + ".DMOV"
|
|
pv = self.getPv(s)
|
|
sys.stdout.write("wait motion " + s + " ")
|
|
sys.stdout.flush()
|
|
while pv.getw() == 0:
|
|
sys.stdout.write(".")
|
|
sys.stdout.flush()
|
|
time.sleep(0.1)
|
|
print("done")
|
|
|
|
def setPos(self, m, ofs):
|
|
pvSET = self.getPv(m + ".SET")
|
|
pvVAL = self.getPv(m + ".VAL")
|
|
pvSET.putw(1) # Set
|
|
print("set ofset to %f" % ofs)
|
|
pvVAL.putw(ofs) # set value (without move)
|
|
time.sleep(0.1) # wait for safety
|
|
pvSET.putw(0) # Use
|
|
time.sleep(0.1) # wait for safety
|
|
|
|
def moveAbs(self, m, val):
|
|
pvVAL = self.getPv(m + ".VAL")
|
|
pvVAL.putw(val) # set position and move
|
|
|
|
def homing(self):
|
|
pvNameLst = (
|
|
"PSSS055:MOTOR_ROT_X1",
|
|
"PSSS055:MOTOR_X1",
|
|
"PSSS055:MOTOR_Y1",
|
|
"PSSS055:MOTOR_PROBE",
|
|
"PSSS059:MOTOR_X5",
|
|
"PSSS059:MOTOR_Y5",
|
|
"PSSS059:MOTOR_Z5",
|
|
"PSSS059:MOTOR_X3",
|
|
"PSSS059:MOTOR_Y3",
|
|
)
|
|
|
|
pvHOME = self.getPv("PSSS059:HOMING.VAL")
|
|
if pvHOME.getw(1) == 1:
|
|
print("homing still in progress. abort new procedure")
|
|
# return
|
|
pvHOME.putw(1) # homing in progress
|
|
|
|
try:
|
|
pvHomr = self.getPv(pvNameLst[0] + ".HOMR")
|
|
pvHomr.putw(1) # homing MOTOR_ROT_X1
|
|
self.waitMotionDone(pvNameLst[0])
|
|
|
|
homing = (
|
|
(1, 200, 10, 0), # PSSS055:MOTOR_X1 Home on +limit switch to +10mm
|
|
(2, 10, 4.475, 2.22), # PSSS055:MOTOR_Y1 Home on +limit switch to +4.475mm
|
|
(3, 50, 0, -9), # PSSS055:MOTOR_PROBE Home on +limit switch to +0mm
|
|
(4, 80, 35, 0), # PSSS059:MOTOR_X5 Home on +limit switch to +35mm
|
|
(5, 30, 10, 0), # PSSS059:MOTOR_Y5 Home on +limit switch to +10mm
|
|
# (6,20,8.9,0), # PSSS059:MOTOR_Z5 Home on +limit switch to +8.9mm Set HLM to 10, LLM to -2mm
|
|
(7, 30, 1, 0), # PSSS059:MOTOR_X3 Home on +limit switch to +10mm
|
|
(8, 30, 1, -1.4), # PSSS059:MOTOR_Y3 Home on +limit switch to +10mm
|
|
)
|
|
for idx, mvLim, setPos, mvPos in homing:
|
|
pvName = pvNameLst[idx]
|
|
print("homing %s %f %f %f" % (pvName, mvLim, setPos, mvPos))
|
|
self.moveLimit(pvName, mvLim)
|
|
|
|
for idx, mvLim, setPos, mvPos in homing:
|
|
pvName = pvNameLst[idx]
|
|
self.waitMotionDone(pvName)
|
|
self.setPos(pvName, setPos)
|
|
|
|
time.sleep(2)
|
|
print("sleep 2 sec.")
|
|
for idx, mvLim, setPos, mvPos in homing:
|
|
pvName = pvNameLst[idx]
|
|
self.moveAbs(pvName, mvPos)
|
|
|
|
for idx, mvLim, setPos, mvPos in homing:
|
|
pvName = pvNameLst[idx]
|
|
self.waitMotionDone(pvName)
|
|
|
|
except AssertionError as e: # BaseException as e:
|
|
print(e)
|
|
pvHOME.putw(3) # homing failed
|
|
else:
|
|
pvHOME.putw(2) # homing done
|
|
|
|
def set_energy_motor(self, energy2motor, scan=False, rotWait=False):
|
|
crystalType = self.getPv("PSSS059:CRYSTAL_SP").getw()
|
|
# crystalType=2
|
|
# print(crystalType)
|
|
# print(type(crystalType))
|
|
if crystalType == 0:
|
|
return
|
|
# 0 None
|
|
# 1 Si(111)R155
|
|
# 2 Si(220)R75
|
|
# 3 Si(220)R145
|
|
# 4 Si(220)R200
|
|
# 5 Si(333)R155
|
|
|
|
# load lut
|
|
fnXtlLst = (None, "Si111R155", "Si220R75", "Si220R145", "Si220R200", "Si333R155")
|
|
# print(__file__)
|
|
base = os.path.dirname(__file__)
|
|
fn = os.path.join(base, "lut" + fnXtlLst[crystalType] + ".txt")
|
|
lut = np.genfromtxt(fn, delimiter="\t", names=True)
|
|
# lut
|
|
# lut.dtype
|
|
# lut[1]['Energy']
|
|
# lut['Energy']
|
|
# lut=np.genfromtxt(fn, delimiter='\t',skip_header=1)
|
|
if energy2motor:
|
|
energy = self.getPv("PSSS059:ENERGY").getw()
|
|
# energy =6000
|
|
gratingType = self.getPv("PSSS055:GRATING_SP").getw()
|
|
# gratingType=3
|
|
camPosX = "CamPosX"
|
|
if gratingType in (1, 2):
|
|
camPosX += "_100nm"
|
|
elif gratingType == 3:
|
|
camPosX += "_150nm"
|
|
elif gratingType == 4:
|
|
camPosX += "_200nm"
|
|
|
|
camArmRot = np.interp(energy, lut["Energy"], lut["CamArmRot"])
|
|
cristBendRot = np.interp(energy, lut["Energy"], lut["CristBendRot"])
|
|
camPosX = np.interp(energy, lut["Energy"], lut[camPosX])
|
|
evPerPix = np.interp(energy, lut["Energy"], lut["EvPerPix"])
|
|
else:
|
|
camArmRot = self.getPv("PSSS059:MOTOR_ROT_X4").getw()
|
|
idx = ~np.isnan(lut["CamArmRot"])
|
|
lutCamArmRot = lut["CamArmRot"][idx]
|
|
energy = np.interp(camArmRot, lutCamArmRot[::-1], lut["Energy"][idx][::-1])
|
|
evPerPix = np.interp(camArmRot, lutCamArmRot[::-1], lut["EvPerPix"][idx][::-1])
|
|
|
|
# camera: 2560 x 2160
|
|
n = 2560
|
|
i = np.arange(n) - n / 2
|
|
spctr_x = energy + i * evPerPix
|
|
pv = self.getPv("PSSS059:SPECTRUM_X")
|
|
pv.putw(spctr_x)
|
|
|
|
pv = self.getPv("PSSS059:SPECTRUM_Y")
|
|
mu = 2560.0 / 2
|
|
sigma = 100.0
|
|
x = np.arange(2560.0)
|
|
spctr_y = 1000.0 * np.exp(-((x - mu) ** 2 / (2.0 * sigma**2)))
|
|
pv.putw(spctr_y)
|
|
|
|
if energy2motor:
|
|
print(
|
|
"energy2motor: camArmRot: %g cristBendRot: %g camPosX:%g evPerPix:%g"
|
|
% (camArmRot, cristBendRot, camPosX, evPerPix)
|
|
)
|
|
pv = self.getPv("PSSS059:MOTOR_ROT_X4")
|
|
pv.putw(camArmRot)
|
|
pv = self.getPv("PSSS059:MOTOR_ROT_X3")
|
|
pv.putw(cristBendRot)
|
|
if rotWait == True:
|
|
self.waitMotionDone("PSSS059:MOTOR_ROT_X3")
|
|
if scan == False: # if True the camera X position will not be changed
|
|
pv = self.getPv("PSSS059:MOTOR_X5")
|
|
pv.putw(camPosX)
|
|
else:
|
|
print("motor2energy: energy: %g evPerPix:%g" % (energy, evPerPix))
|
|
pv = self.getPv("PSSS059:ENERGY")
|
|
pv.putw(energy)
|
|
|
|
def grating2motor(self):
|
|
energy = self.getPv("PSSS059:ENERGY").getw()
|
|
gratingType = self.getPv("PSSS055:GRATING_SP").getw()
|
|
if gratingType == 0:
|
|
print("no grating")
|
|
girderX = 0.0
|
|
else:
|
|
base = os.path.dirname(__file__)
|
|
fn = fn = os.path.join(base, "lutGirderXTrans.txt")
|
|
lut = np.genfromtxt(fn, delimiter="\t", names=True)
|
|
|
|
d = {1: "100nm", 2: "100nm", 3: "150nm", 4: "200nm"}
|
|
print(gratingType)
|
|
grtStr = d[gratingType]
|
|
|
|
girderX = np.interp(energy, lut["Energy"], lut[grtStr])
|
|
|
|
print("girderX:%g" % (girderX))
|
|
pv = self.getPv("PSSS059:MOTOR_X2")
|
|
pv.putw(girderX)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from optparse import OptionParser, IndentedHelpFormatter
|
|
|
|
class MyFormatter(IndentedHelpFormatter):
|
|
"helper class for formating the OptionParser"
|
|
|
|
def __init__(self):
|
|
IndentedHelpFormatter.__init__(self)
|
|
|
|
def format_epilog(self, epilog):
|
|
if epilog:
|
|
return epilog
|
|
else:
|
|
return ""
|
|
|
|
def parse_args():
|
|
"main command line interpreter function"
|
|
(h, t) = os.path.split(sys.argv[0])
|
|
cmd = "\n " + (t if len(h) > 3 else sys.argv[0]) + " "
|
|
exampleCmd = ("MYPREFIX",)
|
|
epilog = (
|
|
__doc__
|
|
+ """
|
|
Examples:"""
|
|
+ "".join(map(lambda s: cmd + s, exampleCmd))
|
|
+ "\n "
|
|
)
|
|
|
|
fmt = MyFormatter()
|
|
parser = OptionParser(epilog=epilog, formatter=fmt)
|
|
parser.add_option("-m", "--mode", type="int", help="move instead of homing", default=0)
|
|
parser.add_option("-s", "--stdout", action="store_true", help="log to stdout instead of file")
|
|
|
|
(args, other) = parser.parse_args()
|
|
# print(args,other)
|
|
args.var = other
|
|
# args.var=('SARFE10-',)
|
|
|
|
fn = (
|
|
"/afs/psi.ch/intranet/Controls/scratch/"
|
|
+ os.path.basename(__file__)
|
|
+ "_"
|
|
+ os.environ.get("USER")
|
|
+ ".log"
|
|
)
|
|
if not args.stdout:
|
|
print("output redirected to file:\n" + fn)
|
|
stdoutBak = sys.stdout
|
|
sys.stdout = open(fn, "a+")
|
|
print("*" * 30 + "\n" + time.ctime() + ": run on host:" + os.environ.get("HOSTNAME"))
|
|
print("Args:" + str(args) + " " + str(args.var))
|
|
sys.stdout.flush()
|
|
|
|
psss = PSSS(args)
|
|
if args.mode == 0:
|
|
psss.homing()
|
|
elif args.mode == 1:
|
|
psss.set_energy_motor(energy2motor=True)
|
|
elif args.mode == 2:
|
|
psss.set_energy_motor(energy2motor=False)
|
|
elif args.mode == 3:
|
|
psss.grating2motor()
|
|
elif args.mode == 4:
|
|
psss.set_energy_motor(energy2motor=True, scan=True)
|
|
elif args.mode == 5:
|
|
psss.set_energy_motor(energy2motor=True, scan=True, rotWait=True)
|
|
print("PSSS_motion done.")
|
|
|
|
return
|
|
|
|
# os.environ['EPICS_CA_ADDR_LIST']='localhost'
|
|
# os.environ['EPICS_CA_ADDR_LIST']='172.26.0.255 172.26.2.255 172.26.8.255 172.26.16.255 172.26.24.255 172.26.32.255 172.26.40.255 172.26.110.255 172.26.111.255 172.26.120.255 129.129.242.255 129.129.243.255'
|
|
|
|
parse_args()
|