154 lines
5.6 KiB
Python
154 lines
5.6 KiB
Python
from time import sleep
|
|
from epics import PV,caget_many, caput_many
|
|
import numpy as np
|
|
from threading import Thread
|
|
from time import sleep
|
|
|
|
#from onlinemodel.interface import SwissFELMagnet
|
|
|
|
class Magnet:
|
|
|
|
def __init__(self, beamline = 'ARAMIS',debug=False, signal = None, cal = None):
|
|
self.beamline = beamline
|
|
self.debug=debug
|
|
self.signal = signal
|
|
self.progress = None
|
|
# self.cal = SwissFELMagnet()
|
|
# getting the array
|
|
temp = np.sort(np.array(list(range(0, 11)) + [0.5, 1.5]))[::-1]
|
|
temp2 = -1. * temp
|
|
cyclingCurrents = np.ones(len(temp) * 2) * 0.
|
|
cyclingCurrents[::2] = temp
|
|
cyclingCurrents[1::2] = temp2
|
|
self.currents = cyclingCurrents[:-1]
|
|
self.cyclingPause = 3
|
|
self.pvcor = []
|
|
self.abort = False
|
|
|
|
def doAbort(self):
|
|
self.abort = True
|
|
|
|
def setMagnetI(self,maglist,vals):
|
|
if self.debug:
|
|
for i,ele in enumerate(maglist):
|
|
print('DEBUG: Setting %s to: %f A' % (ele,vals[i]))
|
|
return
|
|
caput_many(maglist,vals)
|
|
|
|
def setMagnets(self,maglist,magbd,erg):
|
|
for mag in maglist.keys():
|
|
Itar = self.cal.QuadrupoleKL2I(magbd[mag],maglist[mag],erg*1e6)
|
|
pv = PV('%s:I-SET' % mag)
|
|
if not Itar is None:
|
|
if self.debug:
|
|
self.signal.emit('DEBUG: Setting %s to %f' % (pv.pvname,Itar))
|
|
else:
|
|
pv.value = Itar
|
|
|
|
def scaleMagnets(self,maglist,magbg,EProfCur,EProfTar):
|
|
# get current from magnets
|
|
for i,mag in enumerate(maglist):
|
|
if 'SAR' in mag:
|
|
ecur = EProfCur[-1]
|
|
etar = EProfTar[-1]
|
|
elif 'S30' in mag:
|
|
idx = int(mag[5:7])
|
|
if idx >= len(EProfCur):
|
|
ecur = EProfCur[-1]
|
|
etar = EProfTar[-1]
|
|
else:
|
|
ecur = EProfCur[idx]
|
|
etar = EProfTar[idx]
|
|
if 'MQUA' in mag:
|
|
pv = PV('%s:I-SET' % mag)
|
|
valmag = pv.value
|
|
strength = self.cal.QuadrupoleI2KL(magbg[i],valmag,ecur*1e6)
|
|
Itar = self.cal.QuadrupoleKL2I(magbg[i],strength,etar*1e6)
|
|
elif 'MSEX' in mag:
|
|
pv = PV('%s:I-SET' % mag)
|
|
valmag = pv.value
|
|
strength = self.cal.SextupoleI2KL(magbg[i],valmag,ecur*1e6)
|
|
Itar = self.cal.SextupoleKL2I(magbg[i],strength,etar*1e6)
|
|
elif 'MBND' in mag:
|
|
pv = PV('%s:P-SET' % mag)
|
|
valmag=pv.value
|
|
strength = pv.value
|
|
Itar = valmag-EProfCur[-1]+etar
|
|
else:
|
|
strength = 0
|
|
if not Itar is None:
|
|
if self.debug:
|
|
self.signal.emit('DEBUG: Setting %s to %f' % (pv.pvname,Itar))
|
|
else:
|
|
pv.value = Itar
|
|
|
|
def cycleCorrector(self,QuadList=[],progress=None,done = None):
|
|
self.progress = progress
|
|
self.done = done # signal to indicate end of thread
|
|
self.pvcor = [PV(ele.replace('MQUA','MCRX')) for ele in QuadList]+[PV(ele.replace('MQUA','MCRY')) for ele in QuadList]
|
|
self.abort = False
|
|
Thread(name='CycleCor',target=self.TcycleCorrector).start() # starting thread which starts and monitors actual thread
|
|
|
|
def TcycleCorrector(self):
|
|
self.running = True
|
|
istep = 0
|
|
nstep = len(self.currents)
|
|
while self.running:
|
|
if self.progress:
|
|
self.progress.emit(istep,nstep)
|
|
if self.signal:
|
|
self.signal.emit('Setting corrector current to %5.1f A' % self.currents[istep])
|
|
if self.debug:
|
|
print('DEBUG: Setting Corrector Magnets to:',self.currents[istep])
|
|
else:
|
|
for pv in self.pvcor:
|
|
pv.value = cyclingCurrents[istep]
|
|
sleep(self.cyclingPause)
|
|
istep+=1
|
|
if istep == nstep or self.abort:
|
|
self.running = False
|
|
if self.progress:
|
|
self.progress.emit(nstep,nstep)
|
|
if self.done:
|
|
self.done.emit(not self.abort)
|
|
|
|
|
|
|
|
def cycleMagnets(self,maglist, progress = None, done = None):
|
|
if len(maglist) == 0:
|
|
done.emit(True)
|
|
return -1
|
|
self.progress = progress
|
|
self.done = done # signal to indicate end of thread
|
|
state = caget_many(['%s:CYCLE-STATE' % mag for mag in maglist])
|
|
magcyc = [mag for i,mag in enumerate(maglist) if not state[i] == 2]
|
|
time = caget_many(['%s:CYCLE-PROGFULL' % mag for mag in magcyc])
|
|
if len(time) == 0:
|
|
done.emit(True)
|
|
return -1 # no cycling needed
|
|
imax = np.argmax(time)
|
|
self.tcyc = time[imax]
|
|
print('Cycling Time:',self.tcyc,'sec for magnet',magcyc[imax])
|
|
if self.debug:
|
|
print('DEBUG: Cycle-SET magnets')
|
|
else:
|
|
caput_many(['%s:CYCLE' % mag for mag in magcyc],[2]*len(magcyc))
|
|
self.abort = False
|
|
Thread(name='CycleMag',target=self.TcycleMagnets).start()
|
|
return self.tcyc
|
|
|
|
def TcycleMagnets(self):
|
|
self.running = True
|
|
tcur = 0
|
|
while self.running:
|
|
sleep(1)
|
|
tcur += 1
|
|
if self.progress:
|
|
self.progress.emit(tcur,self.tcyc)
|
|
if tcur >= self.tcyc or self.abort:
|
|
self.running = False
|
|
if self.progress:
|
|
self.progress.emit(self.tcyc,self.tcyc)
|
|
if self.done:
|
|
self.done.emit(not self.abort)
|