from time import sleep from epics import PV,caget_many, caput_many import numpy as np from threading import Thread from time import sleep #from onlinemodel.interface import SwissFELMagnet class Magnet: def __init__(self, beamline = 'ARAMIS',debug=False, signal = None, cal = None): self.beamline = beamline self.debug=debug self.signal = signal self.progress = None # self.cal = SwissFELMagnet() # getting the array temp = np.sort(np.array(list(range(0, 11)) + [0.5, 1.5]))[::-1] temp2 = -1. * temp cyclingCurrents = np.ones(len(temp) * 2) * 0. cyclingCurrents[::2] = temp cyclingCurrents[1::2] = temp2 self.currents = cyclingCurrents[:-1] self.cyclingPause = 3 self.pvcor = [] self.abort = False def doAbort(self): self.abort = True def setMagnetI(self,maglist,vals): if self.debug: for i,ele in enumerate(maglist): print('DEBUG: Setting %s to: %f A' % (ele,vals[i])) return caput_many(maglist,vals) def setMagnets(self,maglist,magbd,erg): for mag in maglist.keys(): Itar = self.cal.QuadrupoleKL2I(magbd[mag],maglist[mag],erg*1e6) pv = PV('%s:I-SET' % mag) if not Itar is None: if self.debug: self.signal.emit('DEBUG: Setting %s to %f' % (pv.pvname,Itar)) else: pv.value = Itar def scaleMagnets(self,maglist,magbg,EProfCur,EProfTar): # get current from magnets for i,mag in enumerate(maglist): if 'SAR' in mag: ecur = EProfCur[-1] etar = EProfTar[-1] elif 'S30' in mag: idx = int(mag[5:7]) if idx >= len(EProfCur): ecur = EProfCur[-1] etar = EProfTar[-1] else: ecur = EProfCur[idx] etar = EProfTar[idx] if 'MQUA' in mag: pv = PV('%s:I-SET' % mag) valmag = pv.value strength = self.cal.QuadrupoleI2KL(magbg[i],valmag,ecur*1e6) Itar = self.cal.QuadrupoleKL2I(magbg[i],strength,etar*1e6) elif 'MSEX' in mag: pv = PV('%s:I-SET' % mag) valmag = pv.value strength = self.cal.SextupoleI2KL(magbg[i],valmag,ecur*1e6) Itar = self.cal.SextupoleKL2I(magbg[i],strength,etar*1e6) elif 'MBND' in mag: pv = PV('%s:P-SET' % mag) valmag=pv.value strength = pv.value Itar = valmag-EProfCur[-1]+etar else: strength = 0 if not Itar is None: if self.debug: self.signal.emit('DEBUG: Setting %s to %f' % (pv.pvname,Itar)) else: pv.value = Itar def cycleCorrector(self,QuadList=[],progress=None,done = None): self.progress = progress self.done = done # signal to indicate end of thread self.pvcor = [PV(ele.replace('MQUA','MCRX')) for ele in QuadList]+[PV(ele.replace('MQUA','MCRY')) for ele in QuadList] self.abort = False Thread(name='CycleCor',target=self.TcycleCorrector).start() # starting thread which starts and monitors actual thread def TcycleCorrector(self): self.running = True istep = 0 nstep = len(self.currents) while self.running: if self.progress: self.progress.emit(istep,nstep) if self.signal: self.signal.emit('Setting corrector current to %5.1f A' % self.currents[istep]) if self.debug: print('DEBUG: Setting Corrector Magnets to:',self.currents[istep]) else: for pv in self.pvcor: pv.value = cyclingCurrents[istep] sleep(self.cyclingPause) istep+=1 if istep == nstep or self.abort: self.running = False if self.progress: self.progress.emit(nstep,nstep) if self.done: self.done.emit(not self.abort) def cycleMagnets(self,maglist, progress = None, done = None): if len(maglist) == 0: done.emit(True) return -1 self.progress = progress self.done = done # signal to indicate end of thread state = caget_many(['%s:CYCLE-STATE' % mag for mag in maglist]) magcyc = [mag for i,mag in enumerate(maglist) if not state[i] == 2] time = caget_many(['%s:CYCLE-PROGFULL' % mag for mag in magcyc]) if len(time) == 0: done.emit(True) return -1 # no cycling needed imax = np.argmax(time) self.tcyc = time[imax] print('Cycling Time:',self.tcyc,'sec for magnet',magcyc[imax]) if self.debug: print('DEBUG: Cycle-SET magnets') else: caput_many(['%s:CYCLE' % mag for mag in magcyc],[2]*len(magcyc)) self.abort = False Thread(name='CycleMag',target=self.TcycleMagnets).start() return self.tcyc def TcycleMagnets(self): self.running = True tcur = 0 while self.running: sleep(1) tcur += 1 if self.progress: self.progress.emit(tcur,self.tcyc) if tcur >= self.tcyc or self.abort: self.running = False if self.progress: self.progress.emit(self.tcyc,self.tcyc) if self.done: self.done.emit(not self.abort)