440 lines
17 KiB
Python
440 lines
17 KiB
Python
import json
|
|
import copy
|
|
import numpy as np
|
|
|
|
from onlinemodel.core import Facility
|
|
from onlinemodel.madx import CMadX
|
|
#from sipbuild.generator.outputs.formatters import variable
|
|
|
|
converttwiss= {'betax':'betx','betay':'bety','alphax':'alfx','alphay':'alfy',
|
|
'etax':'dx','etay':'dy','etapx':'dpx','etapy':'dpy','mux':'mux','muy':'muy',
|
|
'x':'x','y':'y','px':'px','py':'py','energy':'energy'}
|
|
|
|
|
|
maxgradient= {'QFF':50.,'QFM':50,'QFD':20,'QFS':20,'QFDM':20,'QFA':31.75,'HFA':37.,'HFB':1445}
|
|
|
|
class Model:
|
|
def __init__(self, phase=0, parent=None):
|
|
print('Initializing online model ...')
|
|
self.phase = phase # current planned future
|
|
self.parent=parent
|
|
self.om = Facility(init=1, alt = phase)
|
|
self.order = None
|
|
|
|
self.madx = CMadX()
|
|
self.startTwiss = None
|
|
self.startEnergy = None
|
|
self.energyReference ='SINLH02.MBND100'
|
|
|
|
# hook up events
|
|
self.matchplot=True
|
|
self.eventHandling()
|
|
|
|
def eventHandling(self):
|
|
self.parent.UITrack.clicked.connect(self.track)
|
|
self.parent.UIMatchSelected.clicked.connect(self.match)
|
|
|
|
|
|
def getLatticeVersion(self):
|
|
return self.om.Version
|
|
|
|
def getVariableInfo(self,varlist):
|
|
variable = {}
|
|
for var in varlist:
|
|
ele = self.om.getElement(var.replace('-','.'))
|
|
if ele is None:
|
|
variable[var]={'Val':0,'Max':None}
|
|
else:
|
|
if 'MQUA' in var or 'MQSK' in var or 'MSEX' in var:
|
|
bg = ele.Baugruppe
|
|
Brho = self.om.EnergyAt(var.replace('-','.'))[0]/3e8
|
|
if bg in maxgradient.keys():
|
|
mgrad=maxgradient[bg]/Brho
|
|
else:
|
|
mgrad=None
|
|
if 'MSEX' in var:
|
|
variable[var]={'Val':ele.k2,'Max':mgrad}
|
|
else:
|
|
variable[var] = {'Val': ele.k1, 'Max': mgrad}
|
|
else:
|
|
variable[var]={'Val':0,'Max':None}
|
|
return variable
|
|
|
|
|
|
def updateFromMachine(self,machine):
|
|
pol = ['LH', 'LV+', 'LV-', 'C+', 'C-', 'ZL']
|
|
mag = machine['Magnet']
|
|
for key in mag:
|
|
keyom=key.replace('-','.')
|
|
self.updateElement(keyom,[mag[key]]) # needs to be a list
|
|
und = machine['Undulator']
|
|
for key in und:
|
|
keyom=key.replace('-','.')
|
|
self.updateElement(keyom,[und[key][0],pol[int(und[key][1])]])
|
|
rf = machine['RF']
|
|
for key in rf:
|
|
keyom=key.replace('-','.')
|
|
self.updateElement(keyom,rf[key])
|
|
energy = machine['Energy']
|
|
for key in energy:
|
|
self.updateEnergy(energy[key])
|
|
|
|
def updateEnergy(self,E0):
|
|
if isinstance(E0,list):
|
|
E0=E0[0]
|
|
self.om.forceEnergyAt(self.energyReference, E0*1e6)
|
|
|
|
def updateElement(self,name,val):
|
|
if 'MQUA' in name or 'MQSK' in name:
|
|
L = self.om.getRegExpElement(name[0:7], name[8:15],'Length')[0]
|
|
self.om.setRegExpElement(name[0:7], name[8:15], 'k1', float(val[0])/L)
|
|
if 'MSEX' in name:
|
|
L = self.om.getRegExpElement(name[0:7], name[8:15], 'Length')[0]
|
|
self.om.setRegExpElement(name[0:7], name[8:15], 'k2', float(val[0])/L)
|
|
if 'MBND' in name:
|
|
self.om.setRegExpElement(name[0:7], 'MBND', 'angle', float(val[0]))
|
|
if 'UMOD' in name:
|
|
self.om.setRegExpElement(name[0:7], 'UMOD', 'K', float(val[0]))
|
|
if 'UIND' in name:
|
|
self.om.setRegExpElement(name[0:7], name[8:15], 'K', float(val[0]))
|
|
if 'SATUN' in name:
|
|
kx = 0
|
|
ky = 1
|
|
if 'LV' in val[1]:
|
|
kx = 1
|
|
ky = 0
|
|
if 'C' in val[1]:
|
|
kx = 0.5
|
|
ky=0.5
|
|
self.om.setRegExpElement(name[0:7], name[8:15], 'kx', kx)
|
|
self.om.setRegExpElement(name[0:7], name[8:15], 'ky', ky)
|
|
if 'RSYS' in name:
|
|
grad = float(val[0])
|
|
phase = float(val[1])
|
|
if 'CB' in name[0:7]:
|
|
grad = grad/ 4.
|
|
elif 'XB' in name[0:7] or 'SINSB03' in name or 'SINSB04' in name:
|
|
grad = grad/2
|
|
L = self.om.getRegExpElement(name[0:7], 'RACC', 'Length')[0]
|
|
self.om.setRegExpElement(name[0:7], 'RACC', 'Gradient', grad/L)
|
|
self.om.setRegExpElement(name[0:7], 'RACC', 'Phase', phase)
|
|
|
|
|
|
def getElements(self):
|
|
return self.om.listElement('*', 1)
|
|
|
|
def getSettings(self):
|
|
elements = self.getElements()
|
|
quadrupoles={}
|
|
sextupoles={}
|
|
dipoles={}
|
|
rf={}
|
|
undulators={}
|
|
kicker={}
|
|
energy={'location': self.energyReference, 'energy' : 1e-6*self.om.EnergyAt(self.energyReference)[0]}
|
|
for ele in elements:
|
|
if 'MQUA' in ele.Name or 'MQSK' in ele.Name:
|
|
quadrupoles[ele.Name]={'k1':ele.k1,'k1L':ele.k1*ele.Length}
|
|
elif 'MSEX' in ele.Name:
|
|
sextupoles[ele.Name]={'k2':ele.k2,'k2L':ele.k2*ele.Length}
|
|
elif 'MBND' in ele.Name:
|
|
if 'SINLH' in ele.Name or 'SINBC' in ele.Name or 'S10BC' in ele.Name or 'SATMA' in ele.Name or 'SATUN' in ele.Name:
|
|
dipoles[ele.Name]={'angle':ele.angle}
|
|
elif 'UIND' in ele.Name or 'UMOD' in ele.Name:
|
|
undulators[ele.Name]={'K':ele.K,'kx':ele.kx,'ky':ele.ky}
|
|
elif 'RACC' in ele.Name:
|
|
rf[ele.Name]={'Gradient':ele.Gradient*ele.Length,'Phase':ele.Phase}
|
|
elif 'MKAC' in ele.Name or 'MKDC' in ele.Name:
|
|
kicker[ele.Name] = {'cory': ele.cory}
|
|
return {'Quadrupole':quadrupoles,'Sextupole':sextupoles,'Dipole':dipoles,'RF':rf,'Undulator':undulators,
|
|
'Kicker':kicker,'Energy':energy, 'InitialCondition':self.startTwiss}
|
|
|
|
def loadSettingsGroup(self,group,fields,normalized=False):
|
|
for key in group.keys():
|
|
ele = self.om.getElement(key)
|
|
for field in fields:
|
|
ele.__dict__[field]=group[key][field]
|
|
if normalized:
|
|
ele.__dict__[field]/=ele.Length
|
|
|
|
def loadSettings(self,settings):
|
|
self.loadSettingsGroup(settings['Quadrupole'],['k1'])
|
|
self.loadSettingsGroup(settings['Sextupole'], ['k2'])
|
|
self.loadSettingsGroup(settings['Dipole'], ['angle'])
|
|
self.loadSettingsGroup(settings['RF'], ['Gradient'],normalized=True)
|
|
self.loadSettingsGroup(settings['RF'], ['Phase'])
|
|
self.loadSettingsGroup(settings['Undulator'], ['K','kx','ky'])
|
|
self.loadSettingsGroup(settings['Kicker'], ['cory'])
|
|
self.startEnergy = settings['Energy']['energy']
|
|
self.energyReference = settings['Energy']['location']
|
|
self.startTwiss = settings['InitialCondition']
|
|
print(self.startTwiss)
|
|
self.updateEnergy(self.startEnergy)
|
|
print(settings['Energy']['energy'])
|
|
print('Energy in system',self.om.EnergyAt(self.energyReference)[0])
|
|
|
|
##### very old code should be become obsolete after some nice matching files
|
|
def updateModel(self):
|
|
file ='SwissFELRef.json'
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
for key in data['settings'].keys():
|
|
tags=key.split(':')
|
|
elename = tags[0].replace('-','.')
|
|
fld = tags[1]
|
|
ele = self.om.getElement(elename)
|
|
if 'K1L' == fld:
|
|
ele.k1 = data['settings'][key]/ele.Length
|
|
elif 'K2L' == fld:
|
|
ele.k2 = data['settings'][key]/ele.Length
|
|
elif 'Grad' == fld:
|
|
# ele.Gradient = data['settings'][key]/ele.Length
|
|
ele.Gradient = data['settings'][key]
|
|
elif 'Phase' == fld:
|
|
ele.Phase = data['settings'][key]
|
|
elif 'kx' == fld:
|
|
ele.kx = data['settings'][key]
|
|
elif 'ky' == fld:
|
|
ele.ky = data['settings'][key]
|
|
elif 'K' == fld:
|
|
ele.K = data['settings'][key]
|
|
elif 'Gap' == fld:
|
|
ele.Gap = data['settings'][key]
|
|
elif 'Offset' == fld:
|
|
ele.Offset = data['settings'][key]
|
|
elif 'K0L' == fld:
|
|
ele.angle = data['settings'][key]
|
|
elif 'KICK' == fld:
|
|
ele.cory=data['settings'][key]
|
|
else:
|
|
print('Unknown tag:', key, data['settings'][key])
|
|
|
|
##############################33
|
|
# tracking
|
|
|
|
|
|
def match(self):
|
|
config = self.parent.reference.getMatchingPoint()
|
|
if config is False:
|
|
return
|
|
self.setBranch(config['destination'])
|
|
sequence = config['sequence']
|
|
ID = config['ID']
|
|
start=config['start']['Location']
|
|
end=config['end']
|
|
if len(end) == 7:
|
|
end=end+'$end'
|
|
if 'Twiss' in config['start']:
|
|
itwiss = {converttwiss[key.lower()]: config['start']['Twiss'][key] for key in
|
|
config['start']['Twiss'].keys()}
|
|
else:
|
|
itwiss = None
|
|
|
|
|
|
var={}
|
|
nvar=0
|
|
for key in config['variable'].keys():
|
|
nvar+=1
|
|
key0=key.replace('-','.').lower()
|
|
if 'mqua' in key0 or 'mqsk' in key0:
|
|
key0+='.k1'
|
|
elif 'msex' in key0:
|
|
key0+='.k2'
|
|
elif 'mkac' in key0 or 'mkdc' in key0:
|
|
key0+='.cory'
|
|
var[key0]=config['variable'][key]
|
|
|
|
target = config['target']
|
|
if 'Script' in target.keys():
|
|
res,twiss,err = self.madx.callScript(script = target['Script'],sequence=sequence, start=start, end=end,init=itwiss,var=var)
|
|
|
|
self.updateModelFromMatching(res)
|
|
self.parent.reference.updateMatchPoint(ID, err)
|
|
if self.matchplot:
|
|
energy = self.calcEnergyProfile(twiss)
|
|
self.parent.plot.newData(twiss, energy)
|
|
return
|
|
|
|
ncon = 0
|
|
condilist = {}
|
|
for key in target.keys():
|
|
condi={}
|
|
for ele in target[key]:
|
|
if isinstance(ele,tuple):
|
|
condi[converttwiss[ele[0].lower()]]={'Condition': ele[1],'Val':ele[2]}
|
|
ncon+=1
|
|
elif isinstance(ele,str):
|
|
print('Needs matching results from', ele)
|
|
return
|
|
condilist[key]=condi
|
|
print('Variables:',nvar)
|
|
print('Conditions:',ncon)
|
|
if nvar > ncon:
|
|
print('Adding dummy constraints from initial conditions')
|
|
if not start in condilist.keys():
|
|
condilist[start]={}
|
|
condi={}
|
|
for twkey in itwiss.keys():
|
|
if ncon < nvar:
|
|
condi[twkey]={'Condition':0,'Val':itwiss[twkey]}
|
|
ncon+=1
|
|
condilist[start]=condi
|
|
|
|
for key in condilist.keys():
|
|
print(key,condilist[key])
|
|
|
|
random = self.parent.UIMatchRandom.isChecked()
|
|
res,twiss,err=self.madx.match(sequence=sequence, start=start, end=end,
|
|
init=itwiss, var=var, const=condilist,
|
|
preset=False,random=random)
|
|
|
|
self.updateModelFromMatching(res)
|
|
if config['save'] == True:
|
|
self.parent.reference.saveTwiss(ID,twiss)
|
|
self.parent.reference.updateMatchPoint(ID,err)
|
|
if self.matchplot:
|
|
energy = self.calcEnergyProfile(twiss)
|
|
self.parent.plot.newData(twiss,energy)
|
|
|
|
|
|
def updateModelFromMatching(self,var):
|
|
for magm in var.keys():
|
|
mag0 = magm[0:15]
|
|
val = var[magm]
|
|
ele = self.om.getElement(mag0)
|
|
if 'mqua' in magm or 'mqsk' in magm:
|
|
if not ele is None:
|
|
ele.k1 = val
|
|
print('Updating %s to k1: %8.4f' % (mag0, val))
|
|
magnets=self.parent.reference.getDependence(mag0)
|
|
if not magnets is None:
|
|
for magd in magnets:
|
|
ele = self.om.getElement(magd)
|
|
if not ele is None:
|
|
ele.k1 = val
|
|
print('Updating %s to k1: %8.4f' % (magd, val))
|
|
elif 'mkac' in magm or 'mkdc' in magm:
|
|
if not ele is None:
|
|
ele.design_kick = val
|
|
ele.cory = val
|
|
print('Updating %s to design_kick: %8.4f' % (mag0, val))
|
|
##################
|
|
# tracking
|
|
|
|
def track(self):
|
|
start = str(self.parent.UITrackStart.text()).upper()
|
|
end = str(self.parent.UITrackEnd.text()).upper()
|
|
if len(start)>7:
|
|
start = start[0:7]
|
|
if len(end)>7:
|
|
end = end[0:7]
|
|
refloc, twiss0 = self.parent.reference.getReference()
|
|
|
|
if refloc.upper() == 'START':
|
|
refloc = start.upper()
|
|
twiss0['energy'] = self.startEnergy*1e-3 # convert to GeV for madx
|
|
start, end = self.checkRange(start, end, refloc[0:7])
|
|
if start is None:
|
|
return
|
|
print('Tracking from',start,'to',end)
|
|
self.doTrack(start,end,refloc,twiss0)
|
|
|
|
|
|
def doTrack(self, start,end, refloc, twiss_in, plot = True):
|
|
twiss0 = {converttwiss[key]:twiss_in[key] for key in twiss_in.keys()}
|
|
self.setBranch(end.upper())
|
|
if not refloc == start:
|
|
twiss0 = self.doBackTrack(refloc,start,twiss0)
|
|
print('Back Tracking Done')
|
|
for key in twiss_in.keys():
|
|
if converttwiss[key] in twiss0.keys():
|
|
self.startTwiss[key+'0'] = twiss0[converttwiss[key]]
|
|
# self.madx.updateVariables(twiss0)
|
|
twiss = self.madx.track('swissfel',start+'$START',end+'$END',twiss0)
|
|
energy = self.calcEnergyProfile(twiss)
|
|
if plot:
|
|
self.parent.plot.newData(twiss,energy)
|
|
|
|
|
|
|
|
|
|
def doBackTrack(self,start=None,end=None,twiss0=None):
|
|
print('Backtracking 1:',twiss0)
|
|
twiss0['alfx'] = -twiss0['alfx'] # revert particle trajectories
|
|
twiss0['alfy'] = -twiss0['alfy']
|
|
twiss0['dpx'] = -twiss0['dpx']
|
|
twiss0['dpy'] = -twiss0['dpy']
|
|
twiss0['px'] = -twiss0['px']
|
|
twiss0['py'] = -twiss0['py']
|
|
# self.madx.updateVariables(twiss0)
|
|
# print(start,end)
|
|
twiss = self.madx.track('invswissfel', start, end+'$START',twiss0)
|
|
twiss1 = {}
|
|
twiss1['betx'] = twiss.betx[-1] # revert trajectory back
|
|
twiss1['bety'] = twiss.bety[-1]
|
|
twiss1['alfx'] = -twiss.alfx[-1]
|
|
twiss1['alfy'] = -twiss.alfy[-1]
|
|
twiss1['dx'] = twiss.dx[-1]
|
|
twiss1['dy'] = twiss.dy[-1]
|
|
twiss1['dpx'] = -twiss.dpx[-1]
|
|
twiss1['dpy'] = -twiss.dpy[-1]
|
|
twiss1['x'] = twiss.x[-1]
|
|
twiss1['y'] = twiss.y[-1]
|
|
twiss1['px'] = -twiss.px[-1]
|
|
twiss1['py'] = -twiss.py[-1]
|
|
return twiss1
|
|
|
|
def calcEnergyProfile(self,twiss):
|
|
energy = np.array([0. for i in range(len(twiss.betx))])
|
|
e0 = 0.
|
|
for i, name in enumerate(twiss.name):
|
|
if len(name) > 15:
|
|
elename = name[0:15]
|
|
ele = self.om.getElement(elename)
|
|
if not ele is None:
|
|
erg= self.om.EnergyAt(ele)
|
|
if e0 == 0.:
|
|
energy[:i]+=erg[0]*1e-6
|
|
e0 = (erg[0]+erg[1])*1e-6
|
|
energy[i]=e0
|
|
return energy
|
|
|
|
def setBranch(self,end):
|
|
destination = 'ARAMIS'
|
|
if 'SPO' in end:
|
|
destination = 'PORTHOS'
|
|
elif 'SAT' in end:
|
|
destination = 'ATHOS'
|
|
elif 'S10BD' in end or 'SIN' in end:
|
|
destination = 'INJECTOR'
|
|
self.om.setBranch(destination,'SINLH01')
|
|
self.order=self.om.getBranchElements()
|
|
self.madx.updateLattice(self.om,destination)
|
|
|
|
def checkRange(self,start,end,ref):
|
|
if self.om.getSection(start) is None:
|
|
print('Invalid staring point for tracking. Setting to SINLH01')
|
|
startsec='SINLH01'
|
|
else:
|
|
startsec = self.om.getSection(start).Element[0].Name
|
|
if self.om.getSection(end) is None:
|
|
print('Invalid staring point for tracking. Setting to starting section')
|
|
endsec = startsec
|
|
else:
|
|
endsec = self.om.getSection(end).Element[0].Name
|
|
if self.om.getSection(ref) is None:
|
|
print('Invalid section for reference. Aborting tracking')
|
|
return None,None
|
|
refsec = self.om.getSection(ref).Element[0].Name
|
|
if not self.om.isUpstream(startsec,endsec):
|
|
return self.checkRange(end,start,ref)
|
|
if not self.om.isUpstream(startsec,refsec):
|
|
start = ref
|
|
print('Reference point is upstream the tracking range. Extending range')
|
|
if not self.om.isUpstream(refsec,endsec):
|
|
end = ref
|
|
print('Reference point is downstream the tracking range. Extending range')
|
|
return start,end
|
|
|
|
|