Files
SwissFEL-Reference-Files/scripts/convertAstra2Elegant.py
2026-03-04 15:28:18 +01:00

172 lines
6.0 KiB
Python

import sys
import subprocess
import pysdds
import numpy as np
import pandas as pd
import io
import matplotlib.pyplot as plt
from argparse import ArgumentParser
class Astra2ElegantConverter:
def __init__(self):
self.filename = 'convert.sdds'
self.Q=None
self.meanP=None
def importAstraFile(self, input = ''):
if len(input) < 1:
print('ERROR: Astra file for input is not defined. Exiting...')
return False
status = subprocess.run(['astra2elegant',input,self.filename])
if (status.returncode != 0):
print('Cannot convert Astra output distribution:', input)
return False
print('Astra particle file converted...')
return True
def applyWakes(self):
# write lattice:
if self.Q is None:
return False
fid = open('applywakes.lat','w')
fid.write('Wake: Wake, INPUTFILE="scripts/wake_l_S-band.sdds",TCOLUMN="t",WCOLUMN="W",SMOOTHING=1\n')
fid.write('Q: Charge, Total=%e\n' % self.Q)
fid.write('Injector: Line = (Q,Wake)\n')
fid.close()
fid = open('applywakes.ele', 'w')
fid.write('&run_setup\n')
fid.write('lattice = applywakes.lat,\n')
fid.write('use_beamline = Injector,\n')
fid.write('rootname = applywakes,\n')
fid.write('output = %s.out,\n')
fid.write(' combine_bunch_statistics = 0,\n')
fid.write('default_order = 2,\n')
fid.write('concat_order = 0,\n')
fid.write('print_statistics = 0,\n')
fid.write('random_number_seed = 9876543210,\n')
fid.write('p_central = %e,\n' % self.meanP)
fid.write('tracking_updates = 1\n')
fid.write('&end\n\n')
fid.write('& run_control\n')
fid.write('n_steps = 1,\n')
fid.write('reset_rf_for_each_step = 1/0\n')
fid.write('&end\n\n')
fid.write('&sdds_beam\n')
fid.write('input_type = "elegant",\n')
fid.write('sample_interval = 1,\n')
fid.write('input = convert.sdds,\n')
fid.write('reuse_bunch = 0\n')
fid.write('&end\n\n')
fid.write('&track\n')
fid.write('&end\n')
fid.close()
status = subprocess.run(['elegant','applywakes.ele'])
if (status.returncode != 0):
print('Cannot apply S-band Wakes with Elegant...')
return False
print('Injector S-band wakes applied...')
return True
def importElegantFile(self, input = 'convert.sdds'):
dist = pysdds.read(input)
self.Q = dist.par('Charge').data[0]
self.x = dist.col('x').data[0]
self.xp = dist.col('xp').data[0]
self.y = dist.col('y').data[0]
self.yp = dist.col('yp').data[0]
self.t = dist.col('t').data[0]
self.p = dist.col('p').data[0]
self.t -= np.mean(self.t)
self.meanP=np.mean(self.p)
print('Particle distribution imported...')
print(' Count: %d' % self.t.size)
print(' Charge: %5.1f pC' % (self.Q*1e12))
print(' Mean Energy: %5.1f MeV' % (self.meanP*0.511))
plt.scatter(self.t,self.p,s=0.5)
plt.show()
return True
def cutDistribution(self,sigma=0):
if sigma <=0:
return
rmsT=np.std(self.t)
idx=np.argwhere(np.abs(self.t)< rmsT*sigma)
N0 = float(len(self.t))
self.x=self.x[idx]
self.y=self.y[idx]
self.xp=self.xp[idx]
self.yp=self.yp[idx]
self.p=self.p[idx]
self.t=self.t[idx]
N1 = float(len(self.t))
self.Q = self.Q*N1/N0
print('Particle distribution cut...')
print(' Count: %d' % self.t.size)
print(' Charge: %5.1f pC' % (self.Q * 1e12))
def analyseBeam(self,N = 20):
betay, alphay, emity = self.getTwiss(self.y, self.yp, self.p)
betax, alphax, emitx = self.getTwiss(self.x, self.xp, self.p)
print('Analysis of distribution:')
print(' emit in x (nm): %5.1f' % (emitx*1e9))
print(' beta in x (m): %6.3f' % betax)
print(' alpha in x (rad): %6.3f' % alphax)
print(' emit in y (nm): %5.1f' % (emity * 1e9))
print(' beta in y (m): %6.3f' % betay)
print(' alpha in y (rad): %6.3f' % alphay)
def getTwiss(self,x,xp,p):
x1 = np.mean(x)
x2= np.mean(x*x)
xp1 = np.mean(xp)
xp2 = np.mean(xp*xp)
xxp1 = np.mean(x*xp)
g1=np.mean(p)
ex = np.sqrt((x2-x1*x1)*(xp2-xp1*xp1)-(xxp1-x1*xp1)**2)*g1
bx = (x2-x1*x1)*g1/ex
ax = - (xxp1-x1*xp1)*g1/ex
return bx,ax,ex
def exportElegantDistribution(self,output = 'SF_input_dist.sdds'):
meas_df = {'t':self.t[:,0],'p':self.p[:,0],'x':self.x[:,0],'xp':self.xp[:,0],
'y':self.y[:,0],'yp':self.yp[:,0],}
df_meas = pd.DataFrame.from_dict(meas_df)
parameters = {'Charge': [float(self.Q)]}
subprocess.run(['rm',output])
sdds = pysdds.SDDSFile.from_df([df_meas], parameter_dict=parameters, mode='binary')
sdds.validate_data()
pysdds.write(sdds, output)
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('-input', type=str, help='Astra Particle Distribution File', default='')
parser.add_argument('-output', type=str, help='SDDS Output Distribution', default='SF_input_dist.sdds')
parser.add_argument('-cut', type=float, help='Cut particles beyond the value, given in units of rms duration', default=2.2)
parser.add_argument('-nslice', type=int, help='number of slices for analysis', default=20)
args = parser.parse_args()
a2e =Astra2ElegantConverter()
if not a2e.importAstraFile(input = args.input):
sys.exit(0)
if not a2e.importElegantFile('convert.sdds'):
sys.exit(0)
if not a2e.applyWakes():
sys.exit(0)
if not a2e.importElegantFile('applywakes.out'):
sys.exit(0)
a2e.cutDistribution(sigma = args.cut)
a2e.analyseBeam(N=args.nslice)
a2e.exportElegantDistribution(output = args.output)
sys.exit(0)