import sys import subprocess import pysdds import numpy as np import pandas as pd import io import matplotlib.pyplot as plt from argparse import ArgumentParser class Astra2ElegantConverter: def __init__(self): self.filename = 'convert.sdds' self.Q=None self.meanP=None def importAstraFile(self, input = ''): if len(input) < 1: print('ERROR: Astra file for input is not defined. Exiting...') return False status = subprocess.run(['astra2elegant',input,self.filename]) if (status.returncode != 0): print('Cannot convert Astra output distribution:', input) return False print('Astra particle file converted...') return True def applyWakes(self): # write lattice: if self.Q is None: return False fid = open('applywakes.lat','w') fid.write('Wake: Wake, INPUTFILE="scripts/wake_l_S-band.sdds",TCOLUMN="t",WCOLUMN="W",SMOOTHING=1\n') fid.write('Q: Charge, Total=%e\n' % self.Q) fid.write('Injector: Line = (Q,Wake)\n') fid.close() fid = open('applywakes.ele', 'w') fid.write('&run_setup\n') fid.write('lattice = applywakes.lat,\n') fid.write('use_beamline = Injector,\n') fid.write('rootname = applywakes,\n') fid.write('output = %s.out,\n') fid.write(' combine_bunch_statistics = 0,\n') fid.write('default_order = 2,\n') fid.write('concat_order = 0,\n') fid.write('print_statistics = 0,\n') fid.write('random_number_seed = 9876543210,\n') fid.write('p_central = %e,\n' % self.meanP) fid.write('tracking_updates = 1\n') fid.write('&end\n\n') fid.write('& run_control\n') fid.write('n_steps = 1,\n') fid.write('reset_rf_for_each_step = 1/0\n') fid.write('&end\n\n') fid.write('&sdds_beam\n') fid.write('input_type = "elegant",\n') fid.write('sample_interval = 1,\n') fid.write('input = convert.sdds,\n') fid.write('reuse_bunch = 0\n') fid.write('&end\n\n') fid.write('&track\n') fid.write('&end\n') fid.close() status = subprocess.run(['elegant','applywakes.ele']) if (status.returncode != 0): print('Cannot apply S-band Wakes with Elegant...') return False print('Injector S-band wakes applied...') return True def importElegantFile(self, input = 'convert.sdds'): dist = pysdds.read(input) self.Q = dist.par('Charge').data[0] self.x = dist.col('x').data[0] self.xp = dist.col('xp').data[0] self.y = dist.col('y').data[0] self.yp = dist.col('yp').data[0] self.t = dist.col('t').data[0] self.p = dist.col('p').data[0] self.t -= np.mean(self.t) self.meanP=np.mean(self.p) print('Particle distribution imported...') print(' Count: %d' % self.t.size) print(' Charge: %5.1f pC' % (self.Q*1e12)) print(' Mean Energy: %5.1f MeV' % (self.meanP*0.511)) return True def cutDistribution(self,sigma=0): if sigma <=0: return rmsT=np.std(self.t) idx=np.argwhere(np.abs(self.t)< rmsT*sigma) N0 = float(len(self.t)) self.x=self.x[idx] self.y=self.y[idx] self.xp=self.xp[idx] self.yp=self.yp[idx] self.p=self.p[idx] self.t=self.t[idx] N1 = float(len(self.t)) self.Q = self.Q*N1/N0 print('Particle distribution cut...') print(' Count: %d' % self.t.size) print(' Charge: %5.1f pC' % (self.Q * 1e12)) def analyseBeam(self): betay, alphay, emity = self.getTwiss(self.y, self.yp, self.p) betax, alphax, emitx = self.getTwiss(self.x, self.xp, self.p) print('Analysis of distribution:') print(' emit in x (nm): %5.1f' % (emitx*1e9)) print(' beta in x (m): %6.3f' % betax) print(' alpha in x (rad): %6.3f' % alphax) print(' emit in y (nm): %5.1f' % (emity * 1e9)) print(' beta in y (m): %6.3f' % betay) print(' alpha in y (rad): %6.3f' % alphay) def analyseBeamSlice(self,slice): dt = (np.max(self.t)-np.min(self.t))/slice tmin=np.min(self.t) ts = np.linspace(0.5*dt+tmin,(slice-0.5)*dt+tmin,num=slice) twiss=np.zeros((8,slice)) betay0, alphay0, emity0 = self.getTwiss(self.y, self.yp, self.p) betax0, alphax0, emitx0 = self.getTwiss(self.x, self.xp, self.p) gammax0 = (1+alphax0**2)/betax0 gammay0 = (1 + alphay0 ** 2) / betay0 for i,tt in enumerate(ts): idx=np.argwhere(np.abs(self.t-tt)<0.5*dt) betay, alphay, emity = self.getTwiss(self.y[idx], self.yp[idx], self.p[idx]) betax, alphax, emitx = self.getTwiss(self.x[idx], self.xp[idx], self.p[idx]) gammax = (1 + alphax ** 2) / betax gammay = (1 + alphay ** 2) / betay twiss[0, i] = betax twiss[1, i] = alphax twiss[2, i] = emitx twiss[3, i] = betay twiss[4, i] = alphay twiss[5, i] = emity twiss[6, i] = 0.5 * (betax * gammax0 + betax0 * gammax - 2 * alphax * alphax0) twiss[7, i] = 0.5 * (betay * gammay0 + betay0 * gammay - 2 * alphay * alphay0) plt.plot(ts*1e12, twiss[2, :]*1e9,label='x') plt.plot(ts*1e12, twiss[5, :]*1e9,label='y') plt.xlabel(r'$t$ (ps)') plt.ylabel(r'$\epsilon_n$ (nm)') plt.legend() plt.title('Slice Emittance') plt.show() plt.plot(ts * 1e12, twiss[6, :], label='x') plt.plot(ts * 1e12, twiss[7, :], label='y') plt.xlabel(r'$t$ (ps)') plt.ylabel(r'$\zeta$') plt.legend() plt.title(r'Slice Mismatch') plt.show() def getTwiss(self,x,xp,p): x1 = np.mean(x) x2= np.mean(x*x) xp1 = np.mean(xp) xp2 = np.mean(xp*xp) xxp1 = np.mean(x*xp) g1=np.mean(p) ex = np.sqrt((x2-x1*x1)*(xp2-xp1*xp1)-(xxp1-x1*xp1)**2)*g1 bx = (x2-x1*x1)*g1/ex ax = - (xxp1-x1*xp1)*g1/ex return bx,ax,ex def exportElegantDistribution(self,output = 'SF_input_dist.sdds'): if len(self.t.shape) > 1: meas_df = {'t': self.t[:, 0], 'p': self.p[:, 0], 'x': self.x[:, 0], 'xp': self.xp[:, 0], 'y': self.y[:, 0], 'yp': self.yp[:, 0], } else: meas_df = {'t': self.t[:], 'p': self.p[:], 'x': self.x[:], 'xp': self.xp[:], 'y': self.y[:], 'yp': self.yp[:], } df_meas = pd.DataFrame.from_dict(meas_df) parameters = {'Charge': [float(self.Q)]} subprocess.run(['rm',output]) sdds = pysdds.SDDSFile.from_df([df_meas], parameter_dict=parameters, mode='binary') sdds.validate_data() sdds.col('x').nm['units'] = 'm' sdds.col('y').nm['units'] = 'm' sdds.col('t').nm['units'] = 's' sdds.col('p').nm['units'] = 'm$be$nc' sdds.par('Charge').nm['units'] = 'C' pysdds.write(sdds, output) def plotLPS(self): plt.scatter(self.t*1e12, self.p*0.511, s=0.5) plt.xlabel(r'$t$ (ps)') plt.ylabel(r'$p$ (MeV)') plt.show() def cleanup(self,output='SF_input_dist.sdds'): subprocess.run(['mv','applywakes.out',output]) subprocess.run(['rm','applywakes.ele']) subprocess.run(['rm', 'applywakes.lat']) subprocess.run(['rm','convert.sdds']) def convertAstra2Elegant(input,output): a2e =Astra2ElegantConverter() if not a2e.importAstraFile(input = input): return False if not a2e.importElegantFile('convert.sdds'): return False if not a2e.applyWakes(): return False if not a2e.importElegantFile('applywakes.out'): return False a2e.analyseBeam() a2e.cleanup(output=output) return True def modifyElegantDist(input,output,cut,slice,scale): a2e = Astra2ElegantConverter() if not a2e.importElegantFile(input = input): return False a2e.cutDistribution(cut) a2e.plotLPS() if slice > 1: a2e.analyseBeamSlice(slice) a2e.exportElegantDistribution(output) a2e.importElegantFile(input=output) a2e.analyseBeam() return True if __name__ == '__main__': parser = ArgumentParser() parser.add_argument('-input', type=str, help='Astra Particle Distribution File Name', default='') parser.add_argument('-output', type=str, help='SDDS Output Distribution File Name', default='SF_input_dist.sdds') parser.add_argument('-cut', type=float, help='Cut particles beyond the value, given in units of rms duration', default=2.2) parser.add_argument('-nslice', type=int, help='Number of slices for analysis', default=20) parser.add_argument('-nsize', type=int, help='Number of particles after upscaling/downscaling', default=0) args = parser.parse_args() tmpfile = args.output+'.temp' status = convertAstra2Elegant(args.input, tmpfile) if not status: sys.exit(0) status = modifyElegantDist(tmpfile,args.output,args.cut,args.nslice,args.nsize) sys.exit(0)