251 lines
9.1 KiB
Python
251 lines
9.1 KiB
Python
import sys
|
|
import subprocess
|
|
import pysdds
|
|
import numpy as np
|
|
import pandas as pd
|
|
import io
|
|
import matplotlib.pyplot as plt
|
|
from argparse import ArgumentParser
|
|
|
|
class Astra2ElegantConverter:
|
|
def __init__(self):
|
|
self.filename = 'convert.sdds'
|
|
self.Q=None
|
|
self.meanP=None
|
|
|
|
|
|
def importAstraFile(self, input = ''):
|
|
if len(input) < 1:
|
|
print('ERROR: Astra file for input is not defined. Exiting...')
|
|
return False
|
|
status = subprocess.run(['astra2elegant',input,self.filename])
|
|
if (status.returncode != 0):
|
|
print('Cannot convert Astra output distribution:', input)
|
|
return False
|
|
print('Astra particle file converted...')
|
|
return True
|
|
|
|
def applyWakes(self):
|
|
# write lattice:
|
|
if self.Q is None:
|
|
return False
|
|
fid = open('applywakes.lat','w')
|
|
fid.write('Wake: Wake, INPUTFILE="scripts/wake_l_S-band.sdds",TCOLUMN="t",WCOLUMN="W",SMOOTHING=1\n')
|
|
fid.write('Q: Charge, Total=%e\n' % self.Q)
|
|
fid.write('Injector: Line = (Q,Wake)\n')
|
|
fid.close()
|
|
fid = open('applywakes.ele', 'w')
|
|
fid.write('&run_setup\n')
|
|
fid.write('lattice = applywakes.lat,\n')
|
|
fid.write('use_beamline = Injector,\n')
|
|
fid.write('rootname = applywakes,\n')
|
|
fid.write('output = %s.out,\n')
|
|
fid.write(' combine_bunch_statistics = 0,\n')
|
|
fid.write('default_order = 2,\n')
|
|
fid.write('concat_order = 0,\n')
|
|
fid.write('print_statistics = 0,\n')
|
|
fid.write('random_number_seed = 9876543210,\n')
|
|
fid.write('p_central = %e,\n' % self.meanP)
|
|
fid.write('tracking_updates = 1\n')
|
|
fid.write('&end\n\n')
|
|
fid.write('& run_control\n')
|
|
fid.write('n_steps = 1,\n')
|
|
fid.write('reset_rf_for_each_step = 1/0\n')
|
|
fid.write('&end\n\n')
|
|
fid.write('&sdds_beam\n')
|
|
fid.write('input_type = "elegant",\n')
|
|
fid.write('sample_interval = 1,\n')
|
|
fid.write('input = convert.sdds,\n')
|
|
fid.write('reuse_bunch = 0\n')
|
|
fid.write('&end\n\n')
|
|
fid.write('&track\n')
|
|
fid.write('&end\n')
|
|
fid.close()
|
|
|
|
status = subprocess.run(['elegant','applywakes.ele'])
|
|
if (status.returncode != 0):
|
|
print('Cannot apply S-band Wakes with Elegant...')
|
|
return False
|
|
print('Injector S-band wakes applied...')
|
|
return True
|
|
|
|
def importElegantFile(self, input = 'convert.sdds'):
|
|
|
|
dist = pysdds.read(input)
|
|
self.Q = dist.par('Charge').data[0]
|
|
self.x = dist.col('x').data[0]
|
|
self.xp = dist.col('xp').data[0]
|
|
self.y = dist.col('y').data[0]
|
|
self.yp = dist.col('yp').data[0]
|
|
self.t = dist.col('t').data[0]
|
|
self.p = dist.col('p').data[0]
|
|
self.t -= np.mean(self.t)
|
|
self.meanP=np.mean(self.p)
|
|
|
|
print('Particle distribution imported...')
|
|
print(' Count: %d' % self.t.size)
|
|
print(' Charge: %5.1f pC' % (self.Q*1e12))
|
|
print(' Mean Energy: %5.1f MeV' % (self.meanP*0.511))
|
|
return True
|
|
|
|
def cutDistribution(self,sigma=0):
|
|
if sigma <=0:
|
|
return
|
|
rmsT=np.std(self.t)
|
|
idx=np.argwhere(np.abs(self.t)< rmsT*sigma)
|
|
N0 = float(len(self.t))
|
|
self.x=self.x[idx]
|
|
self.y=self.y[idx]
|
|
self.xp=self.xp[idx]
|
|
self.yp=self.yp[idx]
|
|
self.p=self.p[idx]
|
|
self.t=self.t[idx]
|
|
N1 = float(len(self.t))
|
|
self.Q = self.Q*N1/N0
|
|
print('Particle distribution cut...')
|
|
print(' Count: %d' % self.t.size)
|
|
print(' Charge: %5.1f pC' % (self.Q * 1e12))
|
|
|
|
def analyseBeam(self):
|
|
betay, alphay, emity = self.getTwiss(self.y, self.yp, self.p)
|
|
betax, alphax, emitx = self.getTwiss(self.x, self.xp, self.p)
|
|
print('Analysis of distribution:')
|
|
print(' emit in x (nm): %5.1f' % (emitx*1e9))
|
|
print(' beta in x (m): %6.3f' % betax)
|
|
print(' alpha in x (rad): %6.3f' % alphax)
|
|
print(' emit in y (nm): %5.1f' % (emity * 1e9))
|
|
print(' beta in y (m): %6.3f' % betay)
|
|
print(' alpha in y (rad): %6.3f' % alphay)
|
|
|
|
def analyseBeamSlice(self,slice):
|
|
dt = (np.max(self.t)-np.min(self.t))/slice
|
|
tmin=np.min(self.t)
|
|
ts = np.linspace(0.5*dt+tmin,(slice-0.5)*dt+tmin,num=slice)
|
|
twiss=np.zeros((8,slice))
|
|
betay0, alphay0, emity0 = self.getTwiss(self.y, self.yp, self.p)
|
|
betax0, alphax0, emitx0 = self.getTwiss(self.x, self.xp, self.p)
|
|
gammax0 = (1+alphax0**2)/betax0
|
|
gammay0 = (1 + alphay0 ** 2) / betay0
|
|
for i,tt in enumerate(ts):
|
|
idx=np.argwhere(np.abs(self.t-tt)<0.5*dt)
|
|
betay, alphay, emity = self.getTwiss(self.y[idx], self.yp[idx], self.p[idx])
|
|
betax, alphax, emitx = self.getTwiss(self.x[idx], self.xp[idx], self.p[idx])
|
|
gammax = (1 + alphax ** 2) / betax
|
|
gammay = (1 + alphay ** 2) / betay
|
|
twiss[0, i] = betax
|
|
twiss[1, i] = alphax
|
|
twiss[2, i] = emitx
|
|
twiss[3, i] = betay
|
|
twiss[4, i] = alphay
|
|
twiss[5, i] = emity
|
|
twiss[6, i] = 0.5 * (betax * gammax0 + betax0 * gammax - 2 * alphax * alphax0)
|
|
twiss[7, i] = 0.5 * (betay * gammay0 + betay0 * gammay - 2 * alphay * alphay0)
|
|
|
|
|
|
plt.plot(ts*1e12, twiss[2, :]*1e9,label='x')
|
|
plt.plot(ts*1e12, twiss[5, :]*1e9,label='y')
|
|
plt.xlabel(r'$t$ (ps)')
|
|
plt.ylabel(r'$\epsilon_n$ (nm)')
|
|
plt.legend()
|
|
plt.title('Slice Emittance')
|
|
plt.show()
|
|
plt.plot(ts * 1e12, twiss[6, :], label='x')
|
|
plt.plot(ts * 1e12, twiss[7, :], label='y')
|
|
plt.xlabel(r'$t$ (ps)')
|
|
plt.ylabel(r'$\zeta$')
|
|
plt.legend()
|
|
plt.title(r'Slice Mismatch')
|
|
plt.show()
|
|
|
|
def getTwiss(self,x,xp,p):
|
|
x1 = np.mean(x)
|
|
x2= np.mean(x*x)
|
|
xp1 = np.mean(xp)
|
|
xp2 = np.mean(xp*xp)
|
|
xxp1 = np.mean(x*xp)
|
|
g1=np.mean(p)
|
|
ex = np.sqrt((x2-x1*x1)*(xp2-xp1*xp1)-(xxp1-x1*xp1)**2)*g1
|
|
bx = (x2-x1*x1)*g1/ex
|
|
ax = - (xxp1-x1*xp1)*g1/ex
|
|
|
|
return bx,ax,ex
|
|
|
|
def exportElegantDistribution(self,output = 'SF_input_dist.sdds'):
|
|
if len(self.t.shape) > 1:
|
|
meas_df = {'t': self.t[:, 0], 'p': self.p[:, 0], 'x': self.x[:, 0], 'xp': self.xp[:, 0],
|
|
'y': self.y[:, 0], 'yp': self.yp[:, 0], }
|
|
else:
|
|
meas_df = {'t': self.t[:], 'p': self.p[:], 'x': self.x[:], 'xp': self.xp[:],
|
|
'y': self.y[:], 'yp': self.yp[:], }
|
|
df_meas = pd.DataFrame.from_dict(meas_df)
|
|
parameters = {'Charge': [float(self.Q)]}
|
|
subprocess.run(['rm',output])
|
|
sdds = pysdds.SDDSFile.from_df([df_meas], parameter_dict=parameters, mode='binary')
|
|
sdds.validate_data()
|
|
sdds.col('x').nm['units'] = 'm'
|
|
sdds.col('y').nm['units'] = 'm'
|
|
sdds.col('t').nm['units'] = 's'
|
|
sdds.col('p').nm['units'] = 'm$be$nc'
|
|
sdds.par('Charge').nm['units'] = 'C'
|
|
pysdds.write(sdds, output)
|
|
|
|
def plotLPS(self):
|
|
plt.scatter(self.t*1e12, self.p*0.511, s=0.5)
|
|
plt.xlabel(r'$t$ (ps)')
|
|
plt.ylabel(r'$p$ (MeV)')
|
|
plt.show()
|
|
|
|
def cleanup(self,output='SF_input_dist.sdds'):
|
|
subprocess.run(['mv','applywakes.out',output])
|
|
subprocess.run(['rm','applywakes.ele'])
|
|
subprocess.run(['rm', 'applywakes.lat'])
|
|
subprocess.run(['rm','convert.sdds'])
|
|
|
|
|
|
def convertAstra2Elegant(input,output):
|
|
a2e =Astra2ElegantConverter()
|
|
if not a2e.importAstraFile(input = input):
|
|
return False
|
|
if not a2e.importElegantFile('convert.sdds'):
|
|
return False
|
|
if not a2e.applyWakes():
|
|
return False
|
|
if not a2e.importElegantFile('applywakes.out'):
|
|
return False
|
|
a2e.analyseBeam()
|
|
a2e.cleanup(output=output)
|
|
return True
|
|
|
|
|
|
def modifyElegantDist(input,output,cut,slice,scale):
|
|
a2e = Astra2ElegantConverter()
|
|
if not a2e.importElegantFile(input = input):
|
|
return False
|
|
a2e.cutDistribution(cut)
|
|
a2e.plotLPS()
|
|
if slice > 1:
|
|
a2e.analyseBeamSlice(slice)
|
|
a2e.exportElegantDistribution(output)
|
|
a2e.importElegantFile(input=output)
|
|
a2e.analyseBeam()
|
|
|
|
return True
|
|
if __name__ == '__main__':
|
|
|
|
parser = ArgumentParser()
|
|
parser.add_argument('-input', type=str, help='Astra Particle Distribution File Name', default='')
|
|
parser.add_argument('-output', type=str, help='SDDS Output Distribution File Name', default='SF_input_dist.sdds')
|
|
parser.add_argument('-cut', type=float, help='Cut particles beyond the value, given in units of rms duration', default=2.2)
|
|
parser.add_argument('-nslice', type=int, help='Number of slices for analysis', default=20)
|
|
parser.add_argument('-nsize', type=int, help='Number of particles after upscaling/downscaling', default=0)
|
|
|
|
args = parser.parse_args()
|
|
tmpfile = args.output+'.temp'
|
|
status = convertAstra2Elegant(args.input, tmpfile)
|
|
if not status:
|
|
sys.exit(0)
|
|
status = modifyElegantDist(tmpfile,args.output,args.cut,args.nslice,args.nsize)
|
|
|
|
|
|
sys.exit(0)
|