Files
PBSwissMX/python/coordTrfTest.py

367 lines
9.9 KiB
Python
Executable File

#!/usr/bin/env python
# *-----------------------------------------------------------------------*
# | |
# | Copyright (c) 2017 by Paul Scherrer Institute (http://www.psi.ch) |
# | |
# | Author Thierry Zamofing (thierry.zamofing@psi.ch) |
# *-----------------------------------------------------------------------*
'''
tools to check kinematics
'''
import os, sys, json
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import mpl_toolkits.mplot3d as plt3d
import matplotlib.animation as anim
from matplotlib.widgets import Slider
import subprocess as sprc
from utilities import *
import telnetlib
class GpasciiCommunicator():
'''Communicates with the Delta Tau gpascii programm
'''
gpascii_ack="\x06\r\n"
gpascii_inp='Input\r\n'
def connect(self, host, username='root', password='deltatau',prompt='ppmac# ',verbose=0):
p=telnetlib.Telnet(host)
s=p.read_until('login: ')
if verbose: print(s)
p.write(username+'\n')
s =p.read_until('Password: ')
if verbose: print(s)
p.write(password+'\n')
s =p.read_until(prompt) # command prompt
if verbose: print(s)
p.write('gpascii -2\n') # execute gpascii command
s=p.read_until(self.gpascii_inp)
if verbose: print(s)
return p
class CoordTrf:
def __init__(self,**kwargs):
for k,v in kwargs.iteritems():
setattr(self,k,v)
pass
def show_vel(self):
rec=self.rec
fig = plt.figure()
#y = np.diff(rec[:, 2])
y = np.diff(rec,axis=0)
mx=y.max(0)
mn=y.min(0)
scl=np.maximum(mx,-mn)
scl=np.where(scl==0, 1, scl) # replace 0 by 1
y/=scl
x = np.arange(0, y.shape[0]);
x= x.reshape(-1,1).dot(np.ones((1,y.shape[1])))
lbl=('cx','cz','w','fy')
#plt.plot(x, y,label=lbl)
for i in range(y.shape[1]):
plt.plot(x[:,i], y[:,i],label=lbl[i])
plt.legend()
def show_pos(self):
rec=self.rec
y = rec
#plt.ion()
fig = plt.figure(figsize=(20,6))
c='bg'
lbl=('q1','q2')
dx=.1/len(lbl)
for i in range(rec.shape[1]):
if i==0:
ax = fig.add_axes([.1, .05, .85, .90])
axl =[ax,]
else:
ax = fig.add_axes(axl[0].get_position(), sharex=ax)
ax.spines['top'].set_visible(False)
ax.spines['bottom'].set_visible(False)
ax.xaxis.set_visible(False)
ax.patch.set_visible(False)
ax.spines['left'].set_position(('axes', -dx*i))
axl.append(ax)
ax.set_ylabel(lbl[i], color=c[i])
ax.tick_params(axis='y', colors=c[i])
x = np.arange(0, y.shape[0]);
h=[]
for i in range(rec.shape[1]):
h+=axl[i].plot(x, y[:,i],c[i],label=lbl[i])
plt.legend(handles=h)
cid = fig.canvas.mpl_connect('motion_notify_event', self.onmove)
fig.obj=self
fig.data=y
@staticmethod
def onmove(event):
#print 'button=%s, x=%d, y=%d, xdata=%f, ydata=%f'%(
# event.button, event.x, event.y, event.xdata, event.ydata)
obj = event.canvas.figure.obj
data = event.canvas.figure.data
if(event.xdata):
idx=round(event.xdata)
msg='%d: q1:%.3f q2:%.3f'%((idx,)+tuple(data[idx,:]))
#print msg
event.canvas.toolbar.set_message(msg)
def download_code(self, prg, file=None, gather=False):
host=self.host
if self.verbose & 4:
for ln in prg:
print(ln)
if file is not None:
fh = open(file, 'w')
fh.write('\n'.join(prg))
fh.close()
if host is not None:
cmd = 'gpasciiCommander --host ' + host + ' ' + file
print(cmd)
p = sprc.Popen(cmd, shell=True) # , stdout=sprc.PIPE, stderr=sprc.STDOUT)
# res=p.stdout.readlines(); print res
retval = p.wait()
# gather -u /var/ftp/gather/out.txt
if gather:
# ***wait program finished P1000=1***
com=GpasciiCommunicator().connect(host,prompt='# ')
ack=GpasciiCommunicator.gpascii_ack
sys.stdout.write('wait execution...');sys.stdout.flush()
while(True):
#Gather.MaxLines calculates maximum numbewr of gathering into memory
com.write('P1000\n')
val=com.read_until(ack)
#print val
val=int(val[val.find('=')+1:].rstrip(ack))
if val==1:break
#com.write('Gather.Index\n')
#val=com.read_until(ack)
#print val
time.sleep(.2)
sys.stdout.write('.');sys.stdout.flush()
fnRmt = '/var/ftp/gather/out.txt'
fnLoc = '/tmp/gather.txt'
print('\ngather data to %s...' % fnRmt)
p = sprc.Popen(('ssh', 'root@' + host, 'gather ', '-u', fnRmt), shell=False, stdin=sprc.PIPE, stdout=sprc.PIPE,
stderr=sprc.PIPE)
res = p.wait()
if res:
print('ssh failed. ssh root@%s to open a session' % host)
return
print('transfer data to %s...' % fnLoc)
p = sprc.Popen(('scp', 'root@' + host + ':' + fnRmt, fnLoc), shell=False, stdin=sprc.PIPE, stdout=sprc.PIPE,
stderr=sprc.PIPE)
res = p.wait()
self.rec = np.genfromtxt(fnLoc, delimiter=' ')
#plt.ion()
self.show_pos();self.show_vel()
plt.show()
def gen_code(self, mode):
prg=[]
file='/tmp/prg%d.cfg'%mode
gather=True
if mode==0:
gather=False
prg.append('''
$$$***
!common()
!encoder_sim(enc=1,tbl=1,mot=1)
!motor(mot=1,dirCur=0,contCur=100,peakCur=100,timeAtPeak=1,JogSpeed=8.,numPhase=3,invDir=True)
Motor[1].pLimits=0;Motor[1].AmpFaultLevel=0;Motor[1].pAmpEnable=0;Motor[1].pAmpFault=0
!encoder_sim(enc=2,tbl=2,mot=2)
!motor(mot=2,dirCur=0,contCur=100,peakCur=1000,timeAtPeak=1,JogSpeed=8.,numPhase=3,invDir=True)
Motor[2].pLimits=0;Motor[2].AmpFaultLevel=0;Motor[2].pAmpEnable=0;Motor[2].pAmpFault=0
!encoder_sim(enc=3,tbl=3,mot=3)
!motor(mot=3,dirCur=0,contCur=100,peakCur=1000,timeAtPeak=1,JogSpeed=8.,numPhase=3,invDir=True)
Motor[3].pLimits=0;Motor[3].AmpFaultLevel=0;Motor[3].pAmpEnable=0;Motor[3].pAmpFault=0
!encoder_sim(enc=4,tbl=4,mot=4)
!motor(mot=4,dirCur=0,contCur=100,peakCur=1000,timeAtPeak=1,JogSpeed=8.,numPhase=3,invDir=True)
Motor[4].pLimits=0;Motor[4].AmpFaultLevel=0;Motor[4].pAmpEnable=0;Motor[4].pAmpFault=0
!encoder_sim(enc=5,tbl=5,mot=5)
!motor(mot=5,dirCur=0,contCur=100,peakCur=1000,timeAtPeak=1,JogSpeed=8.,numPhase=3,invDir=True)
Motor[5].pLimits=0;Motor[5].AmpFaultLevel=0;Motor[5].pAmpEnable=0;Motor[5].pAmpFault=0
!encoder_sim(enc=6,tbl=6,mot=6)
!motor(mot=6,dirCur=0,contCur=100,peakCur=1000,timeAtPeak=1,JogSpeed=8.,numPhase=3,invDir=True)
Motor[6].pLimits=0;Motor[6].AmpFaultLevel=0;Motor[6].pAmpEnable=0;Motor[6].pAmpFault=0
!encoder_sim(enc=7,tbl=7,mot=7)
!motor(mot=7,dirCur=0,contCur=100,peakCur=1000,timeAtPeak=1,JogSpeed=8.,numPhase=3,invDir=True)
Motor[7].pLimits=0;Motor[7].AmpFaultLevel=0;Motor[7].pAmpEnable=0;Motor[7].pAmpFault=0
!encoder_sim(enc=8,tbl=8,mot=8)
!motor(mot=8,dirCur=0,contCur=100,peakCur=1000,timeAtPeak=1,JogSpeed=8.,numPhase=3,invDir=True)
Motor[8].pLimits=0;Motor[8].AmpFaultLevel=0;Motor[8].pAmpEnable=0;Motor[8].pAmpFault=0
Motor[3].JogSpeed=1000 //used for joging
Motor[3].MaxSpeed=1000 //used for motion program
#1..8j/
''')
if mode==1:
gather=False
prg.append('''
#1..2j=0
Gather.Enable=0
Gather.Items=2
Gather.MaxSamples=1000000
Gather.Period=10
Gather.Addr[0]=Motor[1].ActPos.a
Gather.Addr[1]=Motor[2].ActPos.a
open prog 1
jog1=0
dwell100
P1000=0
nofrax
linearabs
Gather.Enable=2
Y2.0
dwell100
Y4.0
dwell100
Y6.0
dwell100
Y8.0
dwell100
Gather.Enable=0
P1000=1
close
open prog 2
jog1=0
dwell100
P1000=0
nofrax
Gather.Enable=2
pvt100abs
Y0:0
Y2:20
Y4:20
Y6:20
Y8:0
dwell100
Gather.Enable=0
P1000=1
close
''')
if mode==2:
gather=False
prg.append('''
//motors CX CZ RY FY
// 4 5 3 1
&1
a
#1->0
#2->0
#3->0
#4->0
#5->0
#6->0
#7->0
#8->0
#1->Y
open forward
close
open inverse
close
''')
if mode==3:
gather=False
prg.append('''
// Set the motors as inverse kinematic axes in CS 1
//motors CX CZ RY FY
// 4 5 3 1
&1
a
#1->0
#2->0
#3->0
#4->0
#5->0
#6->0
#7->0
#8->0
#1->I
open forward
define(q1='L1')
define(Y='C7')
//coord Y
send 1"forward D0: %f \\n",D0
if(D0>0) callsub 100
D0=$00000080; //B=$2 X=$40 Y=$80 Z=$100 hex(2+int('40',16)+int('80',16)+int('100',16)) -> 0x1c2
N100:
Y=q1
send 1"forward result %f\\n",Y
P1001+=1
close
open inverse
define(q1='L1', vq1='R1')
define(Y='C7',vY='C39')
//coord Y
send 1"inverse D0:%f _Y:%f:%f\\n",D0,Y,vY
//D0 is set to $000001c2
define(q1='L1')
q1=Y
if(D0>0)
{
vq1=vY // THIS LINE IS USED FOR PVT MOVE !!!
send 1"inverse result _q1:%f:%f\\n",q1,vq1
}
else
{
send 1"inverse result %f\\n",q1
}
P1002+=1
close
''')
self.download_code(prg, file, gather)
def test(self):
file='/tmp/prg.cfg'
#self.gen_code(0) #motor config
self.gen_code(1) #program code
self.gen_code(2) #simple coord trf
#self.download_code(['&1;b1r',], file, True)
self.download_code(['&1;b2r',], file, True)
self.gen_code(3) #fwd/inv kinematics
#prg.append(' Coord[1].AltFeedRate=0') # allow maximum speed
#prg.append(' Coord[1].SegMoveTime=0') # turn off segmented mode
#self.download_code(['&1;b1r',], file, True)
#self.download_code(['Coord[1].AltFeedRate=100','Coord[1].SegMoveTime=0','&1;b2r',], file, True)
self.download_code(['Coord[1].AltFeedRate=0','Coord[1].SegMoveTime=0','&1;b2r',], file, True)
#self.download_code(['Coord[1].AltFeedRate=1','Coord[1].SegMoveTime=.1','&1;b2r',], file, True)
#self.download_code(['Coord[1].AltFeedRate=0','Coord[1].SegMoveTime=.1','&1;b2r',], file, True)
#self.download_code(['&1;b3r',], file, True)
# *** READ USER MANUAL P 455 ***
# Inverse-Kinematic Program for PVT Mode, No Segmentation
#
if __name__=='__main__':
ct=CoordTrf(verbose=255,host='MOTTEST-CPPM-CRM0485')
ct.test()