Files
SwissFEL-Reference-Files/scripts/convertAstra2Elegant.py

251 lines
9.1 KiB
Python

import sys
import subprocess
import pysdds
import numpy as np
import pandas as pd
import io
import matplotlib.pyplot as plt
from argparse import ArgumentParser
class Astra2ElegantConverter:
def __init__(self):
self.filename = 'convert.sdds'
self.Q=None
self.meanP=None
def importAstraFile(self, input = ''):
if len(input) < 1:
print('ERROR: Astra file for input is not defined. Exiting...')
return False
status = subprocess.run(['astra2elegant',input,self.filename])
if (status.returncode != 0):
print('Cannot convert Astra output distribution:', input)
return False
print('Astra particle file converted...')
return True
def applyWakes(self):
# write lattice:
if self.Q is None:
return False
fid = open('applywakes.lat','w')
fid.write('Wake: Wake, INPUTFILE="scripts/wake_l_S-band.sdds",TCOLUMN="t",WCOLUMN="W",SMOOTHING=1\n')
fid.write('Q: Charge, Total=%e\n' % self.Q)
fid.write('Injector: Line = (Q,Wake)\n')
fid.close()
fid = open('applywakes.ele', 'w')
fid.write('&run_setup\n')
fid.write('lattice = applywakes.lat,\n')
fid.write('use_beamline = Injector,\n')
fid.write('rootname = applywakes,\n')
fid.write('output = %s.out,\n')
fid.write(' combine_bunch_statistics = 0,\n')
fid.write('default_order = 2,\n')
fid.write('concat_order = 0,\n')
fid.write('print_statistics = 0,\n')
fid.write('random_number_seed = 9876543210,\n')
fid.write('p_central = %e,\n' % self.meanP)
fid.write('tracking_updates = 1\n')
fid.write('&end\n\n')
fid.write('& run_control\n')
fid.write('n_steps = 1,\n')
fid.write('reset_rf_for_each_step = 1/0\n')
fid.write('&end\n\n')
fid.write('&sdds_beam\n')
fid.write('input_type = "elegant",\n')
fid.write('sample_interval = 1,\n')
fid.write('input = convert.sdds,\n')
fid.write('reuse_bunch = 0\n')
fid.write('&end\n\n')
fid.write('&track\n')
fid.write('&end\n')
fid.close()
status = subprocess.run(['elegant','applywakes.ele'])
if (status.returncode != 0):
print('Cannot apply S-band Wakes with Elegant...')
return False
print('Injector S-band wakes applied...')
return True
def importElegantFile(self, input = 'convert.sdds'):
dist = pysdds.read(input)
self.Q = dist.par('Charge').data[0]
self.x = dist.col('x').data[0]
self.xp = dist.col('xp').data[0]
self.y = dist.col('y').data[0]
self.yp = dist.col('yp').data[0]
self.t = dist.col('t').data[0]
self.p = dist.col('p').data[0]
self.t -= np.mean(self.t)
self.meanP=np.mean(self.p)
print('Particle distribution imported...')
print(' Count: %d' % self.t.size)
print(' Charge: %5.1f pC' % (self.Q*1e12))
print(' Mean Energy: %5.1f MeV' % (self.meanP*0.511))
return True
def cutDistribution(self,sigma=0):
if sigma <=0:
return
rmsT=np.std(self.t)
idx=np.argwhere(np.abs(self.t)< rmsT*sigma)
N0 = float(len(self.t))
self.x=self.x[idx]
self.y=self.y[idx]
self.xp=self.xp[idx]
self.yp=self.yp[idx]
self.p=self.p[idx]
self.t=self.t[idx]
N1 = float(len(self.t))
self.Q = self.Q*N1/N0
print('Particle distribution cut...')
print(' Count: %d' % self.t.size)
print(' Charge: %5.1f pC' % (self.Q * 1e12))
def analyseBeam(self):
betay, alphay, emity = self.getTwiss(self.y, self.yp, self.p)
betax, alphax, emitx = self.getTwiss(self.x, self.xp, self.p)
print('Analysis of distribution:')
print(' emit in x (nm): %5.1f' % (emitx*1e9))
print(' beta in x (m): %6.3f' % betax)
print(' alpha in x (rad): %6.3f' % alphax)
print(' emit in y (nm): %5.1f' % (emity * 1e9))
print(' beta in y (m): %6.3f' % betay)
print(' alpha in y (rad): %6.3f' % alphay)
def analyseBeamSlice(self,slice):
dt = (np.max(self.t)-np.min(self.t))/slice
tmin=np.min(self.t)
ts = np.linspace(0.5*dt+tmin,(slice-0.5)*dt+tmin,num=slice)
twiss=np.zeros((8,slice))
betay0, alphay0, emity0 = self.getTwiss(self.y, self.yp, self.p)
betax0, alphax0, emitx0 = self.getTwiss(self.x, self.xp, self.p)
gammax0 = (1+alphax0**2)/betax0
gammay0 = (1 + alphay0 ** 2) / betay0
for i,tt in enumerate(ts):
idx=np.argwhere(np.abs(self.t-tt)<0.5*dt)
betay, alphay, emity = self.getTwiss(self.y[idx], self.yp[idx], self.p[idx])
betax, alphax, emitx = self.getTwiss(self.x[idx], self.xp[idx], self.p[idx])
gammax = (1 + alphax ** 2) / betax
gammay = (1 + alphay ** 2) / betay
twiss[0, i] = betax
twiss[1, i] = alphax
twiss[2, i] = emitx
twiss[3, i] = betay
twiss[4, i] = alphay
twiss[5, i] = emity
twiss[6, i] = 0.5 * (betax * gammax0 + betax0 * gammax - 2 * alphax * alphax0)
twiss[7, i] = 0.5 * (betay * gammay0 + betay0 * gammay - 2 * alphay * alphay0)
plt.plot(ts*1e12, twiss[2, :]*1e9,label='x')
plt.plot(ts*1e12, twiss[5, :]*1e9,label='y')
plt.xlabel(r'$t$ (ps)')
plt.ylabel(r'$\epsilon_n$ (nm)')
plt.legend()
plt.title('Slice Emittance')
plt.show()
plt.plot(ts * 1e12, twiss[6, :], label='x')
plt.plot(ts * 1e12, twiss[7, :], label='y')
plt.xlabel(r'$t$ (ps)')
plt.ylabel(r'$\zeta$')
plt.legend()
plt.title(r'Slice Mismatch')
plt.show()
def getTwiss(self,x,xp,p):
x1 = np.mean(x)
x2= np.mean(x*x)
xp1 = np.mean(xp)
xp2 = np.mean(xp*xp)
xxp1 = np.mean(x*xp)
g1=np.mean(p)
ex = np.sqrt((x2-x1*x1)*(xp2-xp1*xp1)-(xxp1-x1*xp1)**2)*g1
bx = (x2-x1*x1)*g1/ex
ax = - (xxp1-x1*xp1)*g1/ex
return bx,ax,ex
def exportElegantDistribution(self,output = 'SF_input_dist.sdds'):
if len(self.t.shape) > 1:
meas_df = {'t': self.t[:, 0], 'p': self.p[:, 0], 'x': self.x[:, 0], 'xp': self.xp[:, 0],
'y': self.y[:, 0], 'yp': self.yp[:, 0], }
else:
meas_df = {'t': self.t[:], 'p': self.p[:], 'x': self.x[:], 'xp': self.xp[:],
'y': self.y[:], 'yp': self.yp[:], }
df_meas = pd.DataFrame.from_dict(meas_df)
parameters = {'Charge': [float(self.Q)]}
subprocess.run(['rm',output])
sdds = pysdds.SDDSFile.from_df([df_meas], parameter_dict=parameters, mode='binary')
sdds.validate_data()
sdds.col('x').nm['units'] = 'm'
sdds.col('y').nm['units'] = 'm'
sdds.col('t').nm['units'] = 's'
sdds.col('p').nm['units'] = 'm$be$nc'
sdds.par('Charge').nm['units'] = 'C'
pysdds.write(sdds, output)
def plotLPS(self):
plt.scatter(self.t*1e12, self.p*0.511, s=0.5)
plt.xlabel(r'$t$ (ps)')
plt.ylabel(r'$p$ (MeV)')
plt.show()
def cleanup(self,output='SF_input_dist.sdds'):
subprocess.run(['mv','applywakes.out',output])
subprocess.run(['rm','applywakes.ele'])
subprocess.run(['rm', 'applywakes.lat'])
subprocess.run(['rm','convert.sdds'])
def convertAstra2Elegant(input,output):
a2e =Astra2ElegantConverter()
if not a2e.importAstraFile(input = input):
return False
if not a2e.importElegantFile('convert.sdds'):
return False
if not a2e.applyWakes():
return False
if not a2e.importElegantFile('applywakes.out'):
return False
a2e.analyseBeam()
a2e.cleanup(output=output)
return True
def modifyElegantDist(input,output,cut,slice,scale):
a2e = Astra2ElegantConverter()
if not a2e.importElegantFile(input = input):
return False
a2e.cutDistribution(cut)
a2e.plotLPS()
if slice > 1:
a2e.analyseBeamSlice(slice)
a2e.exportElegantDistribution(output)
a2e.importElegantFile(input=output)
a2e.analyseBeam()
return True
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('-input', type=str, help='Astra Particle Distribution File Name', default='')
parser.add_argument('-output', type=str, help='SDDS Output Distribution File Name', default='SF_input_dist.sdds')
parser.add_argument('-cut', type=float, help='Cut particles beyond the value, given in units of rms duration', default=2.2)
parser.add_argument('-nslice', type=int, help='Number of slices for analysis', default=20)
parser.add_argument('-nsize', type=int, help='Number of particles after upscaling/downscaling', default=0)
args = parser.parse_args()
tmpfile = args.output+'.temp'
status = convertAstra2Elegant(args.input, tmpfile)
if not status:
sys.exit(0)
status = modifyElegantDist(tmpfile,args.output,args.cut,args.nslice,args.nsize)
sys.exit(0)