#!/usr/bin/env python # *-----------------------------------------------------------------------* # | | # | Copyright (c) 2017 by Paul Scherrer Institute (http://www.psi.ch) | # | | # | Author Thierry Zamofing (thierry.zamofing@psi.ch) | # *-----------------------------------------------------------------------* ''' tools to check kinematics ''' import os, sys, json import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt import mpl_toolkits.mplot3d as plt3d import matplotlib.animation as anim from matplotlib.widgets import Slider import subprocess as sprc from utilities import * import telnetlib class GpasciiCommunicator(): '''Communicates with the Delta Tau gpascii programm ''' gpascii_ack="\x06\r\n" gpascii_inp='Input\r\n' def connect(self, host, username='root', password='deltatau',prompt='ppmac# ',verbose=0): p=telnetlib.Telnet(host) s=p.read_until('login: ') if verbose: print(s) p.write(username+'\n') s =p.read_until('Password: ') if verbose: print(s) p.write(password+'\n') s =p.read_until(prompt) # command prompt if verbose: print(s) p.write('gpascii -2\n') # execute gpascii command s=p.read_until(self.gpascii_inp) if verbose: print(s) return p class CoordTrf: def __init__(self,**kwargs): for k,v in kwargs.iteritems(): setattr(self,k,v) pass def show_vel(self): rec=self.rec fig = plt.figure() #y = np.diff(rec[:, 2]) y = np.diff(rec,axis=0) mx=y.max(0) mn=y.min(0) scl=np.maximum(mx,-mn) scl=np.where(scl==0, 1, scl) # replace 0 by 1 y/=scl x = np.arange(0, y.shape[0]); x= x.reshape(-1,1).dot(np.ones((1,y.shape[1]))) lbl=('cx','cz','w','fy') #plt.plot(x, y,label=lbl) for i in range(y.shape[1]): plt.plot(x[:,i], y[:,i],label=lbl[i]) plt.legend() def show_pos(self): rec=self.rec y = rec #plt.ion() fig = plt.figure(figsize=(20,6)) c='bg' lbl=('q1','q2') dx=.1/len(lbl) for i in range(rec.shape[1]): if i==0: ax = fig.add_axes([.1, .05, .85, .90]) axl =[ax,] else: ax = fig.add_axes(axl[0].get_position(), sharex=ax) ax.spines['top'].set_visible(False) ax.spines['bottom'].set_visible(False) ax.xaxis.set_visible(False) ax.patch.set_visible(False) ax.spines['left'].set_position(('axes', -dx*i)) axl.append(ax) ax.set_ylabel(lbl[i], color=c[i]) ax.tick_params(axis='y', colors=c[i]) x = np.arange(0, y.shape[0]); h=[] for i in range(rec.shape[1]): h+=axl[i].plot(x, y[:,i],c[i],label=lbl[i]) plt.legend(handles=h) cid = fig.canvas.mpl_connect('motion_notify_event', self.onmove) fig.obj=self fig.data=y @staticmethod def onmove(event): #print 'button=%s, x=%d, y=%d, xdata=%f, ydata=%f'%( # event.button, event.x, event.y, event.xdata, event.ydata) obj = event.canvas.figure.obj data = event.canvas.figure.data if(event.xdata): idx=round(event.xdata) msg='%d: q1:%.3f q2:%.3f'%((idx,)+tuple(data[idx,:])) #print msg event.canvas.toolbar.set_message(msg) def download_code(self, prg, file=None, gather=False): host=self.host if self.verbose & 4: for ln in prg: print(ln) if file is not None: fh = open(file, 'w') fh.write('\n'.join(prg)) fh.close() if host is not None: cmd = 'gpasciiCommander --host ' + host + ' ' + file print(cmd) p = sprc.Popen(cmd, shell=True) # , stdout=sprc.PIPE, stderr=sprc.STDOUT) # res=p.stdout.readlines(); print res retval = p.wait() # gather -u /var/ftp/gather/out.txt if gather: # ***wait program finished P1000=1*** com=GpasciiCommunicator().connect(host,prompt='# ') ack=GpasciiCommunicator.gpascii_ack sys.stdout.write('wait execution...');sys.stdout.flush() while(True): #Gather.MaxLines calculates maximum numbewr of gathering into memory com.write('P1000\n') val=com.read_until(ack) #print val val=int(val[val.find('=')+1:].rstrip(ack)) if val==1:break #com.write('Gather.Index\n') #val=com.read_until(ack) #print val time.sleep(.2) sys.stdout.write('.');sys.stdout.flush() fnRmt = '/var/ftp/gather/out.txt' fnLoc = '/tmp/gather.txt' print('\ngather data to %s...' % fnRmt) p = sprc.Popen(('ssh', 'root@' + host, 'gather ', '-u', fnRmt), shell=False, stdin=sprc.PIPE, stdout=sprc.PIPE, stderr=sprc.PIPE) res = p.wait() if res: print('ssh failed. ssh root@%s to open a session' % host) return print('transfer data to %s...' % fnLoc) p = sprc.Popen(('scp', 'root@' + host + ':' + fnRmt, fnLoc), shell=False, stdin=sprc.PIPE, stdout=sprc.PIPE, stderr=sprc.PIPE) res = p.wait() self.rec = np.genfromtxt(fnLoc, delimiter=' ') #plt.ion() self.show_pos();self.show_vel() plt.show() def gen_code(self, mode): prg=[] file='/tmp/prg%d.cfg'%mode gather=True if mode==0: gather=False prg.append(''' $$$*** !common() !encoder_sim(enc=1,tbl=1,mot=1) !motor(mot=1,dirCur=0,contCur=100,peakCur=100,timeAtPeak=1,JogSpeed=8.,numPhase=3,invDir=True) Motor[1].pLimits=0;Motor[1].AmpFaultLevel=0;Motor[1].pAmpEnable=0;Motor[1].pAmpFault=0 !encoder_sim(enc=2,tbl=2,mot=2) !motor(mot=2,dirCur=0,contCur=100,peakCur=1000,timeAtPeak=1,JogSpeed=8.,numPhase=3,invDir=True) Motor[2].pLimits=0;Motor[2].AmpFaultLevel=0;Motor[2].pAmpEnable=0;Motor[2].pAmpFault=0 !encoder_sim(enc=3,tbl=3,mot=3) !motor(mot=3,dirCur=0,contCur=100,peakCur=1000,timeAtPeak=1,JogSpeed=8.,numPhase=3,invDir=True) Motor[3].pLimits=0;Motor[3].AmpFaultLevel=0;Motor[3].pAmpEnable=0;Motor[3].pAmpFault=0 !encoder_sim(enc=4,tbl=4,mot=4) !motor(mot=4,dirCur=0,contCur=100,peakCur=1000,timeAtPeak=1,JogSpeed=8.,numPhase=3,invDir=True) Motor[4].pLimits=0;Motor[4].AmpFaultLevel=0;Motor[4].pAmpEnable=0;Motor[4].pAmpFault=0 !encoder_sim(enc=5,tbl=5,mot=5) !motor(mot=5,dirCur=0,contCur=100,peakCur=1000,timeAtPeak=1,JogSpeed=8.,numPhase=3,invDir=True) Motor[5].pLimits=0;Motor[5].AmpFaultLevel=0;Motor[5].pAmpEnable=0;Motor[5].pAmpFault=0 !encoder_sim(enc=6,tbl=6,mot=6) !motor(mot=6,dirCur=0,contCur=100,peakCur=1000,timeAtPeak=1,JogSpeed=8.,numPhase=3,invDir=True) Motor[6].pLimits=0;Motor[6].AmpFaultLevel=0;Motor[6].pAmpEnable=0;Motor[6].pAmpFault=0 !encoder_sim(enc=7,tbl=7,mot=7) !motor(mot=7,dirCur=0,contCur=100,peakCur=1000,timeAtPeak=1,JogSpeed=8.,numPhase=3,invDir=True) Motor[7].pLimits=0;Motor[7].AmpFaultLevel=0;Motor[7].pAmpEnable=0;Motor[7].pAmpFault=0 !encoder_sim(enc=8,tbl=8,mot=8) !motor(mot=8,dirCur=0,contCur=100,peakCur=1000,timeAtPeak=1,JogSpeed=8.,numPhase=3,invDir=True) Motor[8].pLimits=0;Motor[8].AmpFaultLevel=0;Motor[8].pAmpEnable=0;Motor[8].pAmpFault=0 Motor[3].JogSpeed=1000 //used for joging Motor[3].MaxSpeed=1000 //used for motion program #1..8j/ ''') if mode==1: gather=False prg.append(''' #1..2j=0 Gather.Enable=0 Gather.Items=2 Gather.MaxSamples=1000000 Gather.Period=10 Gather.Addr[0]=Motor[1].ActPos.a Gather.Addr[1]=Motor[2].ActPos.a open prog 1 jog1=0 dwell100 P1000=0 nofrax linearabs Gather.Enable=2 Y2.0 dwell100 Y4.0 dwell100 Y6.0 dwell100 Y8.0 dwell100 Gather.Enable=0 P1000=1 close open prog 2 jog1=0 dwell100 P1000=0 nofrax Gather.Enable=2 pvt100abs Y0:0 Y2:20 Y4:20 Y6:20 Y8:0 dwell100 Gather.Enable=0 P1000=1 close ''') if mode==2: gather=False prg.append(''' //motors CX CZ RY FY // 4 5 3 1 &1 a #1->0 #2->0 #3->0 #4->0 #5->0 #6->0 #7->0 #8->0 #1->Y open forward close open inverse close ''') if mode==3: gather=False prg.append(''' // Set the motors as inverse kinematic axes in CS 1 //motors CX CZ RY FY // 4 5 3 1 &1 a #1->0 #2->0 #3->0 #4->0 #5->0 #6->0 #7->0 #8->0 #1->I open forward define(q1='L1') define(Y='C7') //coord Y send 1"forward D0: %f \\n",D0 if(D0>0) callsub 100 D0=$00000080; //B=$2 X=$40 Y=$80 Z=$100 hex(2+int('40',16)+int('80',16)+int('100',16)) -> 0x1c2 N100: Y=q1 send 1"forward result %f\\n",Y P1001+=1 close open inverse define(q1='L1', vq1='R1') define(Y='C7',vY='C39') //coord Y send 1"inverse D0:%f _Y:%f:%f\\n",D0,Y,vY //D0 is set to $000001c2 define(q1='L1') q1=Y if(D0>0) { vq1=vY // THIS LINE IS USED FOR PVT MOVE !!! send 1"inverse result _q1:%f:%f\\n",q1,vq1 } else { send 1"inverse result %f\\n",q1 } P1002+=1 close ''') self.download_code(prg, file, gather) def test(self): file='/tmp/prg.cfg' #self.gen_code(0) #motor config self.gen_code(1) #program code self.gen_code(2) #simple coord trf #self.download_code(['&1;b1r',], file, True) self.download_code(['&1;b2r',], file, True) self.gen_code(3) #fwd/inv kinematics #prg.append(' Coord[1].AltFeedRate=0') # allow maximum speed #prg.append(' Coord[1].SegMoveTime=0') # turn off segmented mode #self.download_code(['&1;b1r',], file, True) #self.download_code(['Coord[1].AltFeedRate=100','Coord[1].SegMoveTime=0','&1;b2r',], file, True) self.download_code(['Coord[1].AltFeedRate=0','Coord[1].SegMoveTime=0','&1;b2r',], file, True) #self.download_code(['Coord[1].AltFeedRate=1','Coord[1].SegMoveTime=.1','&1;b2r',], file, True) #self.download_code(['Coord[1].AltFeedRate=0','Coord[1].SegMoveTime=.1','&1;b2r',], file, True) #self.download_code(['&1;b3r',], file, True) # *** READ USER MANUAL P 455 *** # Inverse-Kinematic Program for PVT Mode, No Segmentation # if __name__=='__main__': ct=CoordTrf(verbose=255,host='MOTTEST-CPPM-CRM0485') ct.test()