#!/usr/bin/env python # *-----------------------------------------------------------------------* # | | # | Copyright (c) 2016 by Paul Scherrer Institute (http://www.psi.ch) | # | | # | Author Thierry Zamofing (thierry.zamofing@psi.ch) | # *-----------------------------------------------------------------------* ''' tuning functions for ESB-MX Modes: 1 full recording current step 2 full recording open loop 3 full recording closed loop 4 plot the full bode recording 5 plot the full bode recording with an approximation model 6 plot all raw acquired data files 7 custom chirp test 8 generate observer code (after files generated with matlab) -> check https://github.com/klauer/ppmac for fast data gathering server which supports phase gathering -> not yet compiling: /home/zamofing_t/Documents/prj/SwissFEL/PowerBrickInspector/ppmac/fast_gather BUT data acquired and stored in: /media/zamofing_t/DataUbuHD/VirtualBox/shared/data ''' import os, sys, json, time import numpy as np import matplotlib as mpl #mpl.use('GTKAgg') import matplotlib.pyplot as plt from scipy import signal sys.path.insert(0,os.path.expanduser('~/Documents/prj/SwissFEL/PBTools/')) #import pbtools.misc.pp_comm as pp_comm -> pp_comm.PPComm from pbtools.misc.pp_comm import PPComm,GpasciiChannel from pbtools.misc.gather import Gather from pbtools.misc.tuning import Tuning class MXTuning(Tuning): tuneDir='/opt/ppmac/tune/' def __init__(self,comm,gather): Tuning.__init__(self,comm,gather) def init_stage(self): comm=self.comm if comm is None: return gpascii=comm.gpascii sys.stdout.write('homing stage');sys.stdout.flush() gpascii.send_line('enable plc1') time.sleep(.2) while True: act=gpascii.get_variable('Plc[1].Active',type_=int) if act==0: sys.stdout.write('\n') break sys.stdout.write('.');sys.stdout.flush() time.sleep(.2) #Coord[%d].ProgActive if a running proc def bode_model_plot(self, mot,base): self.bode_full_plot(mot,base) fig=plt.gcf() _N=1.#E-3 # normalization factor: -> moves 3 decades to right but has factors around 1 # s -> ms # Hz -> kHz # rad/s -> rad/ms if mot==1: #identify matlab: k:0.671226 w0:134.705 damp:0.191004 mag1=0.671226 #10**(db_mag1/20) db_mag1=20*np.log10(mag1)#dB w1=134.705*_N #rad/sec f1=w1/2/np.pi; # ca. 6.5Hz T1=1/w1 d1=0.19 # daempfung =1 -> keine resonanz -> den1= np.poly1d([T1,1])**2 num1=np.poly1d([mag1]) den1 = np.poly1d([T1**2,2*T1*d1,1]) #reiner integrator: 30Hz=0dB -> k=30*2*pi=180 #num1=np.poly1d([120*120]) #den1 = np.poly1d([1,0,0]) #first resonance frequency f2=np.array([197,199]) d2=np.array([.02,.02])#daempfung w2=f2*2*np.pi*_N #rad/sec T2=1/w2 num2 = np.poly1d([T2[0]**2,2*T2[0]*d2[0],1]) den2 = np.poly1d([T2[1]**2,2*T2[1]*d2[1],1]) mdl= signal.lti(num2, den2) #num denum #bode(mdl) #current loop 2nd order approx #identification with matlab: k=1, w0=8725, d=0.75 dc=.75 # daempfung =1 -> keine resonanz -> den1= np.poly1d([T1,1])**2 wc=8725.*_N # rad/sec #...but phase lag seems to have earlier effect -> reduce wc wc*=.5 # rad/sec fc=wc/2/np.pi # ca 1388Hz Tc=1/wc numc = np.poly1d([1.]) denc = np.poly1d([Tc**2,2*Tc*dc,1]) num=num1*num2*numc#*num3 den=den1*den2*denc#*den3 mdl= signal.lti(num, den) #num denum print(num) print(den) print(mdl) d={'num':num.coeffs,'num1':num1.coeffs,'num2':num2.coeffs,'numc':numc.coeffs, 'den':den.coeffs,'den1':den1.coeffs,'den2':den2.coeffs,'denc':denc.coeffs} fn=os.path.join(base,'model%d.mat'%mot) import scipy.io scipy.io.savemat(fn, mdict=d) print('save to matlab file:'+fn) elif mot==2: #identify matlab: k:1.7282 w0:51.069 damp:0.327613 mag1=1.7282 #10**(db_mag1/20) db_mag1=20*np.log10(mag1)#dB w1=51.069*_N #rad/sec f1=w1/2/np.pi; # ca. 6.5Hz T1=1/w1 d1=0.32 # daempfung =1 -> keine resonanz -> den1= np.poly1d([T1,1])**2 num1=np.poly1d([mag1]) den1 = np.poly1d([T1**2,2*T1*d1,1]) #resonance frequency f2=np.array([57.8,61.8]) d2=np.array([.08,.095])#daempfung w2=f2*2*np.pi #rad/sec T2=1/w2 num2 = np.poly1d([T2[0]**2,2*T2[0]*d2[0],1]) den2 = np.poly1d([T2[1]**2,2*T2[1]*d2[1],1]) mdl= signal.lti(num2, den2) #num denum #bode(mdl) #resonance frequency f3=np.array([136,148]) d3=np.array([.05,.05])#daempfung w3=f3*2*np.pi #rad/sec T3=1/w3 num3 = np.poly1d([T3[0]**2,2*T3[0]*d3[0],1]) den3 = np.poly1d([T3[1]**2,2*T3[1]*d3[1],1]) #mdl= signal.lti(num3, den3) #num denum #bode(mdl) #resonance frequency f4=np.array([410,417]) d4=np.array([.015,.015])#daempfung w4=f4*2*np.pi #rad/sec T4=1/w4 num4 = np.poly1d([T4[0]**2,2*T4[0]*d4[0],1]) den4 = np.poly1d([T4[1]**2,2*T4[1]*d4[1],1]) #mdl= signal.lti(num3, den3) #num denum #bode(mdl) #resonance frequency f5=np.array([230,233]) d5=np.array([.04,.04])#daempfung w5=f5*2*np.pi #rad/sec T5=1/w5 num5 = np.poly1d([T5[0]**2,2*T5[0]*d5[0],1]) den5 = np.poly1d([T5[1]**2,2*T5[1]*d5[1],1]) #current loop 2nd order approx #identification with matlab: k=1, w0=8725, d=0.75 dc=.75 # daempfung =1 -> keine resonanz -> den1= np.poly1d([T1,1])**2 wc=8725.*_N # rad/sec #...but phase lag seems to have earlier effect -> reduce wc wc*=.5 # rad/sec fc=wc/2/np.pi # ca 1388Hz Tc=1/wc numc = np.poly1d([1.]) denc = np.poly1d([Tc**2,2*Tc*dc,1]) num=num1*num2*num3*num4*num5*numc den=den1*den2*den3*den4*den5*denc mdl= signal.lti(num, den) #num denum print(num) print(den) print(mdl) d={'num':num.coeffs,'num1':num1.coeffs,'num2':num2.coeffs,'num3':num3.coeffs,'num4':num4.coeffs,'num5':num5.coeffs,'numc':numc.coeffs, 'den':den.coeffs,'den1':den1.coeffs,'den2':den2.coeffs,'den3':den3.coeffs,'den4':den4.coeffs,'den5':den5.coeffs,'denc':denc.coeffs} fn=os.path.join(base,'model%d.mat'%mot) import scipy.io scipy.io.savemat(fn, mdict=d) print('save to matlab file:'+fn) bode(mdl) w=np.logspace(0,np.log10(2000),1000)*2*np.pi w,mag,phase = signal.bode(mdl,w) f=w/(2*np.pi) ax=fig.axes[0] ax.semilogx(f, mag,'-k',lw=1) # Bode magnitude plot ax=fig.axes[1] ax.semilogx(f, phase,'-k',lw=1) # Bode phase plot # tp print see also: print(np.poly1d([1,2,3], variable='s')), print(np.poly1d([1,2,3], r=True, variable='s')) def usr_servo_gen_code(self,fn='/tmp/ssc1.mat'): import scipy.io,re #the file ssc[1|2].mat has been generated with matlab: #[mot1, mot2]=identifyFxFyStage(); #[pb]=simFxFyStage(mot1); #[ssc]=StateSpaceControlDesign(mot1); mat=scipy.io.loadmat(fn) motid=int(re.search('(\d)\.mat',fn).group(1)) A=mat['Aoz'] B=mat['Boz'] C=mat['Coz'] D=mat['Doz'] V=mat['V'] u=('DesPos','IqMeas','IqVolts','ActPos') y=('obsvOut',) progSample='''double usr_servo_ctrl_{motid}(MotorData *Mptr) {{ pshm->P[2000]=pshm->P[2000]*.9999+abs(Mptr->PosError)*0.0001; //lowpass of Position error return pshm->ServoCtrl(Mptr); }}'''.format(motid=motid) prog='''double obsvr_servo_ctrl_{motid}(MotorData *Mptr) {{ //x[n+1]=A*x[n]+B*u //y=C*x[n]+D*x[n] //u=[{u}].T double x[{A.shape[0]}]; //new state static double _x[{A.shape[0]}]={{0,0,0,0}}; //old state //double {u}; // input values double {y},iqCmd; // output values double maxDac=Mptr->MaxDac; '''.format(motid=motid,A=A,xInit=','.join('0'*A.shape[0]),u=', '.join(u),y=', '.join(y)) s=' //input values\n' for i in range(len(u)): s+=' double {u}=Mptr->{u};\n'.format(u=u[i]) prog+=s+''' if (Mptr->ClosedLoop) { ''' s=' //x[n+1]=A*x[n]+B*u;\n' for i in range(A.shape[0]): s+=' x[%d]='%i for j in range(A.shape[1]): s+='%+28.22g*_x[%d]'%(A[i,j],j) for j in range(B.shape[0]): s+='%+28.22g*%s'%(B[i,j],u[j]) s+=';\n' prog+=s+'\n' s=' //y=C*x[n]+D*x[n];\n' for i in range(C.shape[0]): s+=' %s='%y[i] for j in range(C.shape[1]): s+='%+28.22g*_x[%d]'%(C[i,j],j) s+=';\n' prog+=s+'\n' prog+=''' iqCmd=DesPos*{V}-{y}; if (iqCmd>maxDac) {{ iqCmd=maxDac; }} else {{ if (iqCmd<-maxDac) {{ iqCmd=-maxDac; }} }} //return iqCmd; pshm->P[200{motid}]=iqCmd; //lowpass of Position error return pshm->ServoCtrl(Mptr); }} else {{ Mptr->Servo.Integrator=0.0; return 0.0; }} }}'''.format(V=V[0,0],y=y[0],motid=motid) hdr='''double obsvr_servo_ctrl_{motid}(MotorData *Mptr); EXPORT_SYMBOL(obsvr_servo_ctrl_{motid});'''.format(motid=motid) return (hdr,prog) def check_fast_stage(self,file='/tmp/gather.npz'): if os.path.isfile(file): f=np.load(file) data=f['data'] meta=f['meta'].item() meta['file']=file else: #self.init_stage();time sleep phase=False motor=2 gpascii = self.comm.gpascii gt = self.gather gt.set_phasemode(phase) address=('Motor[2].ActPos','Motor[8].ActPos') gt.set_address(*address) gt.set_property(Period=1) tSrv=gpascii.servo_period tSrv=2E-4 # seconds phOsv = gpascii.get_variable('sys.PhaseOverServoPeriod', float) subs={'prgId':999,'mot':motor,'num':100,'phase':'Phase' if phase else ''} #the servoloop is called 2 times per servo cycle ?!? #don't know why, but this is the reason why the value L10 is incremented by 0.5 prog = ''' &1 open prog {prgId} P1=Motor[{mot}].DesPos-Motor[{mot}].HomePos Q2=10 Gather.{phase}Enable=2 Q1={num} while (Q1>0) {{ jog{mot}=(P1-1000) dwell 10 jog{mot}=(P1) dwell 10 Q1=Q1-1 }} dwell 10 Gather.{phase}Enable=0 close &1 b{prgId}r '''.format(**subs) gpascii.send_line(prog) res=gpascii.sync() res=res.replace(GpasciiChannel.ACK+'\r\n','') print(res) t=time.time() gt.wait_stopped() print('time %f'%(time.time()-t)) self.data=data=gt.upload() meta={'motor':motor,'date':time.asctime(),'ts':tSrv if not phase else tSrv*phOsv,'address':address} np.savez_compressed(file, data=data, meta=meta) meta['file'] = file tSrv=meta['ts'] t=tSrv*np.arange(data.shape[0]) data=data-data[0,:] plt.plot(t,data[:,0],t,data[:,1]) plt.figure() plt.plot(t,data[:,0]-data[:,1]) plt.show() def bode(mdl): w,mag,phase = signal.bode(mdl,1000) f=w/(2*np.pi) fig = plt.figure() ax = fig.add_subplot(2, 1, 1) ax.semilogx(f,mag,'-') # Bode magnitude plot ax.yaxis.set_label_text('dB ampl') plt.grid(True) ax = fig.add_subplot(2, 1, 2) ax.semilogx(f,phase,'-') # Bode magnitude plot ax.yaxis.set_label_text('phase') ax.xaxis.set_label_text('frequency [Hz]') plt.grid(True) #plt.show() if __name__=='__main__': from argparse import ArgumentParser,RawDescriptionHelpFormatter import logging logger = logging.getLogger(__name__) #logger = logging.getLogger('pbtools.misc.pp_comm') logger.setLevel(logging.DEBUG) logging.basicConfig(format=('%(asctime)s %(name)-12s ' '%(levelname)-8s %(message)s'), datefmt='%m-%d %H:%M', ) def parse_args(): 'main command line interpreter function' #usage: gpasciiCommunicator.py --host=PPMACZT84 myPowerBRICK.cfg (h, t)=os.path.split(sys.argv[0]);cmd='\n '+(t if len(h)>3 else sys.argv[0])+' ' exampleCmd=('-n', '-v15' ) epilog=__doc__+''' Examples:'''+''.join(map(lambda s:cmd+s, exampleCmd))+'\n ' parser=ArgumentParser(epilog=epilog,formatter_class=RawDescriptionHelpFormatter) parser.add_argument('--host', help='hostname', metavar='HOST', default='SAR-CPPM-EXPMX1') parser.add_argument('--mode', '-m', type=int, help='modes (see below)', default=1) parser.add_argument('--dir', help='dir', default=None) args=parser.parse_args() #plt.ion() #args.host='MOTTEST-CPPM-CRM0573' #args.host=None if args.host is None: comm=gt=None else: comm = PPComm(host=args.host) gt = Gather(comm) tune=MXTuning(comm,gt) base='MXTuning' if args.dir is not None: base=args.dir assert(os.path.exists(base)) #base=os.path.join(base,args.dir) #if not os.path.exists(base): # os.mkdir(base) mode=args.mode #plt.ion() if mode==1: # full recording current step homed=False for mot in (1, 2): fn=os.path.join(base, 'curr_step%d.npz' % mot) if not os.path.isfile(fn) and not homed: tune.init_stage();homed=True fig=plt.figure(mot) tune.bode_current(motor=mot, magMove=1000, magPhase=500, dwell=10, file=fn,fig=fig) plt.show(block=False) f=np.load(fn) fn=fn[:-3]+'mat' import scipy.io scipy.io.savemat(fn, mdict=f) print('save to matlab file:'+fn) elif mode==2: # full recording open loop for mot in (1, 2): fsin=os.path.join(base, 'sine_ol_mot%d.npz' % (mot)) fcrp=os.path.join(base, 'chirp_ol_mot%d' % (mot)) if not os.path.isfile(fsin): tune.init_stage();plt.close('all') tune.bode_sine(openloop=True, file=fsin, motor=mot) plt.show(block=False) for ext,amp,minFrq,maxFrq,tSec in (('a', 10, 10, 300,30), ('b', 50,100, 500,30), ('c', 50,300,1500,10), ('d',100,300,2000,10)): f=fcrp+ext+'.npz' if not os.path.isfile(f): tune.init_stage();plt.close('all') tune.bode_chirp(openloop=True, file=f, motor=mot, amp=amp, minFrq=minFrq, maxFrq=maxFrq, tSec=tSec) plt.show(block=False) elif mode==3: # full recording closed loop for mot in (1,2): #fsin=os.path.join(base, 'sine_cl_mot%d.npz' % (mot)) fcrp=os.path.join(base, 'chirp_cl_mot%d' % (mot)) #if not os.path.isfile(fsin): tune.init_stage();plt.close('all') #tune.bode_sine(openloop=False, file=fsin, motor=mot) #the generated program is prog 999 (only during acquisition) # Gather.MaxLines=174762 -> ca 15 sec. is max. #with higher frequency and amplitudes often is in DacLimit ! for ext,amp,minFrq,maxFrq,tSec in (('a',100, 1, 200,15), ('b', 20,10, 300,5), ('c', 5,10, 300,5), ('d', 1,10,1000,5), ): f=fcrp+ext+'.npz' if not os.path.isfile(f): tune.init_stage();plt.close('all') tune.bode_chirp(openloop=False, file=f, motor=mot, amp=amp, minFrq=minFrq, maxFrq=maxFrq, tSec=tSec) plt.show(block=False) elif mode==4: #plot the full bode recording tune.bode_full_plot(mot=1,base=base) tune.bode_full_plot(mot=2,base=base) elif mode==5: #plot the full bode recording with an approximation model tune.bode_model_plot(mot=1,base=base) tune.bode_model_plot(mot=2,base=base) elif mode == 6: # plot all raw acquired data files # display bode plots import glob for fn in glob.glob(os.path.join(base,'*.npz')): if os.path.basename(fn).startswith('sine_ol_mot'): tune.bode_sine(openloop=True, file=fn) if os.path.basename(fn).startswith('chirp_ol_mot'): tune.bode_chirp(openloop=True, file=fn) if os.path.basename(fn).startswith('sine_cl_mot'): tune.bode_sine(openloop=False, file=fn) if os.path.basename(fn).startswith('chirp_cl_mot'): tune.bode_chirp(openloop=False, file=fn) elif mode==7: #further tests fn='/tmp/cst_chirp0.npz' motLst=(1, 2)#(2,)# init_stage=False for mot in motLst: fn=os.path.join(base, 'cst_chirp_s_ol_%d.npz' % mot) if not init_stage and not os.path.isfile(fn):tune.init_stage();init_stage=True tune.custom_chirp(motor=mot,minFrq=100,maxFrq=1000,amp=50,tSec=15,mode=0,file=fn) init_stage=False for mot in motLst: fn=os.path.join(base, 'cst_chirp_s_cl1_%d.npz' % mot) if not init_stage and not os.path.isfile(fn):tune.init_stage();init_stage=True tune.custom_chirp(motor=mot,minFrq=1,maxFrq=100,amp=20,tSec=15,mode=1,file=fn) init_stage=False for mot in motLst: fn=os.path.join(base, 'cst_chirp_s_cl2a_%d.npz' % mot) if not init_stage and not os.path.isfile(fn):tune.init_stage();init_stage=True tune.custom_chirp(motor=mot,minFrq=1,maxFrq=30,amp=100,tSec=30,mode=2,file=fn) for mot in motLst: fn=os.path.join(base, 'cst_chirp_s_cl2b_%d.npz' % mot) if not init_stage and not os.path.isfile(fn):tune.init_stage();init_stage=True tune.custom_chirp(motor=mot,minFrq=20,maxFrq=150,amp=10,tSec=30,mode=2,file=fn) init_stage=False for mot in motLst: fn=os.path.join(base, 'cst_chirp_s_cl2c_%d.npz' % mot) if not init_stage and not os.path.isfile(fn):tune.init_stage();init_stage=True tune.custom_chirp(motor=mot,minFrq=100,maxFrq=300,amp=1,tSec=30,mode=2,file=fn) elif mode==8: #generater code #before this can be done, the observer controller has to be designed with matlab: #s.a.ESB_MX/matlab/Readme.md #clear; #clear global; #close all; #[mot1,mot2]=identifyFxFyStage(); #[pb]=simFxFyStage(mot1); #[ssc]=StateSpaceControlDesign(mot1); #[pb]=simFxFyStage(mot2); #[ssc]=StateSpaceControlDesign(mot2); #after this go to: python/usr_code and call make to build the controller #to activate the controller checkout: PBTools/pbtools/usr_servo_phase base=os.path.dirname(__file__) (hdr1,prog1)=tune.usr_servo_gen_code('/tmp/ssc1.mat') (hdr2,prog2)=tune.usr_servo_gen_code('/tmp/ssc2.mat') fn_ct=os.path.join(base,'usr_code/usrcode_template.c') fn_ht=os.path.join(base,'usr_code/usrcode_template.h') fnc=os.path.join(base,'usr_code/usrcode.c') fnh=os.path.join(base,'usr_code/usrcode.h') s=open(fn_ht).read() s=s.replace('',hdr1+'\n\n'+hdr2) fh=open(fnh,'w') fh.write(s) fh.close() print(fnh+' generated.') s=open(fn_ct).read() s=s.replace('',prog1+'\n\n'+prog2) fh=open(fnc,'w') fh.write(s) fh.close() print(fnc+' generated.') print('now compile it looking at PBTools/pbtools/usr_servo_phase/usrServoSample') elif mode==9: #SCRATCH motLst=(1, 2)#(2,)# init_stage=False for mot in motLst: fn=os.path.join(base, 'scratch_%d.npz' % mot) if not init_stage and not os.path.isfile(fn):tune.init_stage();init_stage=True tune.custom_chirp(motor=mot,minFrq=1,maxFrq=20,amp=200,tSec=15,mode=2,file=fn) elif mode==10: # record stage x and probe encoder tune.check_fast_stage() print('done') plt.show() #------------------ Main Code ---------------------------------- #ssh_test()'/tmp/usrcode.c' ret=parse_args() exit(ret) #enable plc1 #./PBTuning.py --host SAR-CPPM-EXPMX1 --mode 1 --mot 1 --dir tmp #./PBTuning.py --host SAR-CPPM-EXPMX1 --mode 1 --mot 2 --dir tmp #-> at low frequencied the speed is too high and encoder looses steps #enable plc1 #AFTER each chirp measurement do enable plc1 again! #./PBTuning.py --host SAR-CPPM-EXPMX1 --mode 2 --mot 1 --dir tmp #./PBTuning.py --host SAR-CPPM-EXPMX1 --mode 2 --mot 2 --dir tmp #./PBTuning.py --host SAR-CPPM-EXPMX1 --plot tmp