#!/usr/bin/env python # *-----------------------------------------------------------------------* # | | # | Copyright (c) 2016 by Paul Scherrer Institute (http://www.psi.ch) | # | | # | Author Thierry Zamofing (thierry.zamofing@psi.ch) | # *-----------------------------------------------------------------------* ''' tuning functions for ESB-MX Modes: bit 0=1: record/plot current step bit 1=2: custom chirp record/plot for IdCmd->ActPos transfer function bit 1=4: custom chirp record/plot for DesPos->ActPos transfer function bit 2=8: plot the full bode recording bit 3=16: plot the full bode recording with an approximation model bit 4=32: plot all raw acquired data files bit 5=64: generate observer code (after files generated with matlab) -> check https://github.com/klauer/ppmac for fast data gathering server which supports phase gathering -> not yet compiling: /home/zamofing_t/Documents/prj/SwissFEL/PowerBrickInspector/ppmac/fast_gather BUT data acquired and stored in: /media/zamofing_t/DataUbuHD/VirtualBox/shared/data ''' import os, sys, json, time import numpy as np import matplotlib as mpl #mpl.use('GTKAgg') import matplotlib.pyplot as plt from scipy import signal sys.path.insert(0,os.path.expanduser('~/Documents/prj/SwissFEL/PBTools/')) #import pbtools.misc.pp_comm as pp_comm -> pp_comm.PPComm from pbtools.misc.pp_comm import PPComm,GpasciiChannel from pbtools.misc.gather import Gather from pbtools.misc.tuning import Tuning class MXTuning(Tuning): tuneDir='/opt/ppmac/tune/' def __init__(self,comm,gather): Tuning.__init__(self,comm,gather) self.homed=False def init_stage(self,fn=''): comm=self.comm if comm is None or self.homed or os.path.isfile(fn): return gpascii=comm.gpascii sys.stdout.write('homing stage');sys.stdout.flush() gpascii.send_line('enable plc1') time.sleep(.2) while True: act=gpascii.get_variable('Plc[1].Active',type_=int) if act==0: sys.stdout.write('\n') break sys.stdout.write('.');sys.stdout.flush() time.sleep(.2) self.homed=True def bode_model_plot(self, mot): self.bode_full_plot(mot,self.baseDir) fig=plt.gcf() _N=1.#E-3 # normalization factor: -> moves 3 decades to right but has factors around 1 # s -> ms # Hz -> kHz # rad/s -> rad/ms if mot==1: #identify matlab: k:0.671226 w0:134.705 damp:0.191004 mag1=0.671226 #10**(db_mag1/20) db_mag1=20*np.log10(mag1)#dB w1=134.705*_N #rad/sec f1=w1/2/np.pi; # ca. 6.5Hz T1=1/w1 d1=0.19 # daempfung =1 -> keine resonanz -> den1= np.poly1d([T1,1])**2 num1=np.poly1d([mag1]) den1 = np.poly1d([T1**2,2*T1*d1,1]) #reiner integrator: 30Hz=0dB -> k=30*2*pi=180 #num1=np.poly1d([120*120]) #den1 = np.poly1d([1,0,0]) #first resonance frequency f2=np.array([197,199]) d2=np.array([.02,.02])#daempfung w2=f2*2*np.pi*_N #rad/sec T2=1/w2 num2 = np.poly1d([T2[0]**2,2*T2[0]*d2[0],1]) den2 = np.poly1d([T2[1]**2,2*T2[1]*d2[1],1]) mdl= signal.lti(num2, den2) #num denum #bode(mdl) #current loop 2nd order approx #identification with matlab: k=1, w0=8725, d=0.75 dc=.75 # daempfung =1 -> keine resonanz -> den1= np.poly1d([T1,1])**2 wc=8725.*_N # rad/sec #...but phase lag seems to have earlier effect -> reduce wc wc*=.5 # rad/sec fc=wc/2/np.pi # ca 1388Hz Tc=1/wc numc = np.poly1d([1.]) denc = np.poly1d([Tc**2,2*Tc*dc,1]) num=num1*num2*numc#*num3 den=den1*den2*denc#*den3 mdl= signal.lti(num, den) #num denum print(num) print(den) print(mdl) d={'num':num.coeffs,'num1':num1.coeffs,'num2':num2.coeffs,'numc':numc.coeffs, 'den':den.coeffs,'den1':den1.coeffs,'den2':den2.coeffs,'denc':denc.coeffs} fn=os.path.join(self.baseDir,'model%d.mat'%mot) import scipy.io scipy.io.savemat(fn, mdict=d) print('save to matlab file:'+fn) elif mot==2: #identify matlab: k:1.7282 w0:51.069 damp:0.327613 mag1=1.7282 #10**(db_mag1/20) db_mag1=20*np.log10(mag1)#dB w1=51.069*_N #rad/sec f1=w1/2/np.pi; # ca. 6.5Hz T1=1/w1 d1=0.32 # daempfung =1 -> keine resonanz -> den1= np.poly1d([T1,1])**2 num1=np.poly1d([mag1]) den1 = np.poly1d([T1**2,2*T1*d1,1]) #resonance frequency f2=np.array([57.8,61.8]) d2=np.array([.08,.095])#daempfung w2=f2*2*np.pi #rad/sec T2=1/w2 num2 = np.poly1d([T2[0]**2,2*T2[0]*d2[0],1]) den2 = np.poly1d([T2[1]**2,2*T2[1]*d2[1],1]) mdl= signal.lti(num2, den2) #num denum #bode(mdl) #resonance frequency f3=np.array([136,148]) d3=np.array([.05,.05])#daempfung w3=f3*2*np.pi #rad/sec T3=1/w3 num3 = np.poly1d([T3[0]**2,2*T3[0]*d3[0],1]) den3 = np.poly1d([T3[1]**2,2*T3[1]*d3[1],1]) #mdl= signal.lti(num3, den3) #num denum #bode(mdl) #resonance frequency f4=np.array([410,417]) d4=np.array([.015,.015])#daempfung w4=f4*2*np.pi #rad/sec T4=1/w4 num4 = np.poly1d([T4[0]**2,2*T4[0]*d4[0],1]) den4 = np.poly1d([T4[1]**2,2*T4[1]*d4[1],1]) #mdl= signal.lti(num3, den3) #num denum #bode(mdl) #resonance frequency f5=np.array([230,233]) d5=np.array([.04,.04])#daempfung w5=f5*2*np.pi #rad/sec T5=1/w5 num5 = np.poly1d([T5[0]**2,2*T5[0]*d5[0],1]) den5 = np.poly1d([T5[1]**2,2*T5[1]*d5[1],1]) #current loop 2nd order approx #identification with matlab: k=1, w0=8725, d=0.75 dc=.75 # daempfung =1 -> keine resonanz -> den1= np.poly1d([T1,1])**2 wc=8725.*_N # rad/sec #...but phase lag seems to have earlier effect -> reduce wc wc*=.5 # rad/sec fc=wc/2/np.pi # ca 1388Hz Tc=1/wc numc = np.poly1d([1.]) denc = np.poly1d([Tc**2,2*Tc*dc,1]) num=num1*num2*num3*num4*num5*numc den=den1*den2*den3*den4*den5*denc mdl= signal.lti(num, den) #num denum print(num) print(den) print(mdl) d={'num':num.coeffs,'num1':num1.coeffs,'num2':num2.coeffs,'num3':num3.coeffs,'num4':num4.coeffs,'num5':num5.coeffs,'numc':numc.coeffs, 'den':den.coeffs,'den1':den1.coeffs,'den2':den2.coeffs,'den3':den3.coeffs,'den4':den4.coeffs,'den5':den5.coeffs,'denc':denc.coeffs} fn=os.path.join(self.baseDir,'model%d.mat'%mot) import scipy.io scipy.io.savemat(fn, mdict=d) print('save to matlab file:'+fn) bode(mdl) w=np.logspace(0,np.log10(2000),1000)*2*np.pi w,mag,phase = signal.bode(mdl,w) f=w/(2*np.pi) ax=fig.axes[0] ax.semilogx(f, mag,'-k',lw=1) # Bode magnitude plot ax=fig.axes[1] ax.semilogx(f, phase,'-k',lw=1) # Bode phase plot # tp print see also: print(np.poly1d([1,2,3], variable='s')), print(np.poly1d([1,2,3], r=True, variable='s')) def usr_servo_gen_code(self,fn='/tmp/ssc1.mat'): import scipy.io,re #the file ssc[1|2].mat has been generated with matlab: #[mot1, mot2]=identifyFxFyStage(); #[pb]=simFxFyStage(mot1); #[ssc]=StateSpaceControlDesign(mot1); mat=scipy.io.loadmat(fn) motid=int(re.search('(\d)\.mat',fn).group(1)) A=mat['Aoz'] B=mat['Boz'] C=mat['Coz'] D=mat['Doz'] V=mat['V'] u=('DesPos','IqMeas','IqVolts','ActPos') y=('obsvOut',) progSample='''double usr_servo_ctrl_{motid}(MotorData *Mptr) {{ pshm->P[2000]=pshm->P[2000]*.9999+abs(Mptr->PosError)*0.0001; //lowpass of Position error return pshm->ServoCtrl(Mptr); }}'''.format(motid=motid) prog='''double obsvr_servo_ctrl_{motid}(MotorData *Mptr) {{ //x[n+1]=A*x[n]+B*u //y=C*x[n]+D*x[n] //u=[{u}].T double x[{A.shape[0]}]; //new state static double _x[{A.shape[0]}]={{0,0,0,0}}; //old state //double {u}; // input values double {y},iqCmd; // output values double maxDac=Mptr->MaxDac; '''.format(motid=motid,A=A,xInit=','.join('0'*A.shape[0]),u=', '.join(u),y=', '.join(y)) s=' //input values\n' for i in range(len(u)): s+=' double {u}=Mptr->{u};\n'.format(u=u[i]) prog+=s+''' if (Mptr->ClosedLoop) { ''' s=' //x[n+1]=A*x[n]+B*u;\n' for i in range(A.shape[0]): s+=' x[%d]='%i for j in range(A.shape[1]): s+='%+28.22g*_x[%d]'%(A[i,j],j) for j in range(B.shape[0]): s+='%+28.22g*%s'%(B[i,j],u[j]) s+=';\n' prog+=s+'\n' s=' //y=C*x[n]+D*x[n];\n' for i in range(C.shape[0]): s+=' %s='%y[i] for j in range(C.shape[1]): s+='%+28.22g*_x[%d]'%(C[i,j],j) s+=';\n' prog+=s+'\n' prog+=''' iqCmd=DesPos*{V}-{y}; //return iqCmd; pshm->P[200{motid}]=iqCmd; //lowpass of Position error return pshm->ServoCtrl(Mptr); }} else {{ Mptr->Servo.Integrator=0.0; return 0.0; }} }}'''.format(V=V[0,0],y=y[0],motid=motid) hdr='''double obsvr_servo_ctrl_{motid}(MotorData *Mptr); EXPORT_SYMBOL(obsvr_servo_ctrl_{motid});'''.format(motid=motid) return (hdr,prog) def check_fast_stage(self,file='/tmp/gather.npz'): if os.path.isfile(file): f=np.load(file) data=f['data'] meta=f['meta'].item() meta['file']=file else: #self.init_stage();time sleep phase=False motor=2 gpascii = self.comm.gpascii gt = self.gather gt.set_phasemode(phase) address=('Motor[2].ActPos','Motor[8].ActPos') gt.set_address(*address) gt.set_property(Period=1) tSrv=gpascii.servo_period tSrv=2E-4 # seconds phOsv = gpascii.get_variable('sys.PhaseOverServoPeriod', float) subs={'prgId':999,'mot':motor,'num':100,'phase':'Phase' if phase else ''} #the servoloop is called 2 times per servo cycle ?!? #don't know why, but this is the reason why the value L10 is incremented by 0.5 prog = ''' &1 open prog {prgId} P1=Motor[{mot}].DesPos-Motor[{mot}].HomePos Q2=10 Gather.{phase}Enable=2 Q1={num} while (Q1>0) {{ jog{mot}=(P1-1000) dwell 10 jog{mot}=(P1) dwell 10 Q1=Q1-1 }} dwell 10 Gather.{phase}Enable=0 close &1 b{prgId}r '''.format(**subs) gpascii.send_line(prog) res=gpascii.sync() res=res.replace(GpasciiChannel.ACK+'\r\n','') print(res) t=time.time() gt.wait_stopped() print('time %f'%(time.time()-t)) self.data=data=gt.upload() meta={'motor':motor,'date':time.asctime(),'ts':tSrv if not phase else tSrv*phOsv,'address':address} np.savez_compressed(file, data=data, meta=meta) meta['file'] = file tSrv=meta['ts'] t=tSrv*np.arange(data.shape[0]) data=data-data[0,:] plt.plot(t,data[:,0],t,data[:,1]) plt.figure() plt.plot(t,data[:,0]-data[:,1]) plt.show() def run(self,mode): #plt.ion() if mode&1: # full recording current step plt.close('all') self.homed=False for mot in (1, 2): fn=os.path.join(self.baseDir, 'curr_step%d.npz' % mot) self.init_stage(fn) self.bode_current(motor=mot, magMove=1000, magPhase=500, dwell=10, file=fn) plt.show(block=False) f=np.load(fn) fn=fn[:-3]+'mat' import scipy.io scipy.io.savemat(fn, mdict=f) print('save to matlab file:'+fn) if mode&2: plt.close('all') self.homed=False motLst = (1, 2) # (2,)# #recType: # IA 0 IqCmd,ActPos (for plant transfer function) # DA 1 DesPos,ActPos (for regulation transfer function) # all 2 DesPos,ActPos,IqCmd,IqMeas,IqVolts (for current, plant and regulation transfer function) #>>>>> IqCmd->ActPos transfer function (plant) using closed loop for low frequencies for mot in motLst: fn = os.path.join(self.baseDir, 'chirp_IA_%da.npz' % mot) self.init_stage(fn) self.custom_chirp(motor=mot, minFrq=1, maxFrq=15, amp=1000, tSec=15, recType=0,openLoop=False, file=fn) self.homed=False #>>>>> IqCmd->ActPos transfer function (plant) using open loop for high frequencies for ext,amp,minFrq,maxFrq,tSec in (('b', 10, 10, 100*1.5, 30), ('c', 50, 100, 300*1.5, 30), ('d', 50, 300, 1000*1.5, 10), ('e', 100, 1000, 2000, 10)): self.homed=False for mot in motLst: fn = os.path.join(self.baseDir, 'chirp_IA_%d%s.npz' % (mot,ext)) self.init_stage(fn) self.custom_chirp(motor=mot, minFrq=minFrq, maxFrq=maxFrq, amp=amp, tSec=tSec, recType=0,openLoop=True, file=fn) if mode&4: plt.close('all') self.homed = False motLst = (1, 2) # (2,)# #>>>>> desPos->actPos transfer function (regulation) using closed loop #motor1: 0dB at 20.4 Hz #motor2: 0dB at 11.3 Hz #1000um ampl. 15Hz ca. 2A current #100um ampl. 15Hz ca. 200mA current #100um at 11.3 Hz needs 100mA ->2A at sqrt(20)*11.3 =50Hz #20um at 11.3 Hz needs 20mA ->2A at sqrt(100)*11.3 =113Hz # 5um at 11.3 Hz needs 5mA ->2A at sqrt(2000/)*11.3 =226Hz # 1um at 11.3 Hz needs 1mA ->2A at sqrt(2000)*11.3 =505Hz #n times freq, -> n^2 current for ext,amp,minFrq,maxFrq,tSec in (('a', 100, 1, 30*1.5, 10), ('b', 20, 30, 75*1.5, 15), ('c', 5, 75, 150*1.5, 5), ('d', 1, 150, 750, 5)): self.homed=False for mot in motLst: fn = os.path.join(self.baseDir, 'chirp_DA_%d%s.npz' % (mot,ext)) self.init_stage(fn) self.custom_chirp(motor=mot, minFrq=minFrq, maxFrq=maxFrq, amp=amp, tSec=tSec, recType=1,openLoop=False, file=fn) #>>>>> all data for different transfer function for ext,amp,minFrq,maxFrq,tSec in (('a', 5, 10, 250, 10),): self.homed=False for mot in motLst: fn = os.path.join(self.baseDir, 'chirp_all_%d%s.npz' % (mot,ext)) self.init_stage(fn) self.custom_chirp(motor=mot, minFrq=minFrq, maxFrq=maxFrq, amp=amp, tSec=tSec, recType=2,openLoop=False, file=fn) if os.path.isfile(fn): f = np.load(fn) data = f['data'] meta = f['meta'].item() meta['file'] = file if len(meta['address']) == 4: for xy in ((0, 1), (0, 3), (2, 3)): self.bode_plot(data, xy=xy, mode=25, **meta) if mode&8: #plot the full bode recording plt.close('all') self.bode_full_plot(mot=1,base=self.baseDir) self.bode_full_plot(mot=2,base=self.baseDir) if mode&16: #plot the full bode recording with an approximation model plt.close('all') self.bode_model_plot(mot=1) self.bode_model_plot(mot=2) if mode&32: # plot all raw acquired data files # display bode plots import glob for fn in glob.glob(os.path.join(self.baseDir,'*.npz')): fh = np.load(fn) meta = fh['meta'].item() data = fh['data'] self.bode_plot(data, mode=25, **meta) if mode&64: #generater code #before this can be done, the observer controller has to be designed with matlab: #s.a.ESB_MX/matlab/Readme.md #clear; #clear global; #close all; #[mot1,mot2]=identifyFxFyStage(); #[pb]=simFxFyStage(mot1); #[ssc]=StateSpaceControlDesign(mot1); #[pb]=simFxFyStage(mot2); #[ssc]=StateSpaceControlDesign(mot2); #after this go to: python/usr_code and call make to build the controller #to activate the controller checkout: PBTools/pbtools/usr_servo_phase base=os.path.dirname(__file__) (hdr1,prog1)=self.usr_servo_gen_code('/tmp/ssc1.mat') (hdr2,prog2)=self.usr_servo_gen_code('/tmp/ssc2.mat') fn_ct=os.path.join(base,'usr_code/usrcode_template.c') fn_ht=os.path.join(base,'usr_code/usrcode_template.h') fnc=os.path.join(base,'usr_code/usrcode.c') fnh=os.path.join(base,'usr_code/usrcode.h') s=open(fn_ht).read() s=s.replace('',hdr1+'\n\n'+hdr2) fh=open(fnh,'w') fh.write(s) fh.close() print(fnh+' generated.') s=open(fn_ct).read() s=s.replace('',prog1+'\n\n'+prog2) fh=open(fnc,'w') fh.write(s) fh.close() print(fnc+' generated.') print('now compile it looking at PBTools/pbtools/usr_servo_phase/usrServoSample') print('done') plt.show() def bode(mdl): w,mag,phase = signal.bode(mdl,1000) f=w/(2*np.pi) fig = plt.figure() ax = fig.add_subplot(2, 1, 1) ax.semilogx(f,mag,'-') # Bode magnitude plot ax.yaxis.set_label_text('dB ampl') plt.grid(True) ax = fig.add_subplot(2, 1, 2) ax.semilogx(f,phase,'-') # Bode magnitude plot ax.yaxis.set_label_text('phase') ax.xaxis.set_label_text('frequency [Hz]') plt.grid(True) #plt.show() if __name__=='__main__': from argparse import ArgumentParser,RawDescriptionHelpFormatter import logging logger = logging.getLogger(__name__) #logger = logging.getLogger('pbtools.misc.pp_comm') logger.setLevel(logging.DEBUG) logging.basicConfig(format=('%(asctime)s %(name)-12s ' '%(levelname)-8s %(message)s'), datefmt='%m-%d %H:%M', ) def parse_args(): 'main command line interpreter function' #usage: gpasciiCommunicator.py --host=PPMACZT84 myPowerBRICK.cfg (h, t)=os.path.split(sys.argv[0]);cmd='\n '+(t if len(h)>3 else sys.argv[0])+' ' exampleCmd=('-n', '-v15' ) epilog=__doc__+''' Examples:'''+''.join(map(lambda s:cmd+s, exampleCmd))+'\n ' parser=ArgumentParser(epilog=epilog,formatter_class=RawDescriptionHelpFormatter) parser.add_argument('--host', help='hostname', metavar='HOST', default='SAR-CPPM-EXPMX1') parser.add_argument('--mode', '-m', type=int, help='modes (see below)', default=1) parser.add_argument('--dir', help='dir', default='MXTuning') args=parser.parse_args() #plt.ion() #args.host='MOTTEST-CPPM-CRM0573' #args.host=None if args.host is None: comm=gt=None else: comm = PPComm(host=args.host) gt = Gather(comm) tune=MXTuning(comm,gt) tune.baseDir=args.dir assert(os.path.exists(tune.baseDir)) tune.run(args.mode) #------------------ Main Code ---------------------------------- #ssh_test()'/tmp/usrcode.c' ret=parse_args() exit(ret) #enable plc1 #./PBTuning.py --host SAR-CPPM-EXPMX1 --mode 1 --mot 1 --dir tmp #./PBTuning.py --host SAR-CPPM-EXPMX1 --mode 1 --mot 2 --dir tmp #-> at low frequencied the speed is too high and encoder looses steps #enable plc1 #AFTER each chirp measurement do enable plc1 again! #./PBTuning.py --host SAR-CPPM-EXPMX1 --mode 2 --mot 1 --dir tmp #./PBTuning.py --host SAR-CPPM-EXPMX1 --mode 2 --mot 2 --dir tmp #./PBTuning.py --host SAR-CPPM-EXPMX1 --plot tmp