import datetime import re import numpy as np from bsread import dispatcher import epics from slic.core.adjustable import PVAdjustable from slic.core.acquisition import BSAcquisition, PVAcquisition from slic.core.scanner import Scanner from sfbd.ext import CounterAdjustable from sfbd.ext import BSCAcquisition def getAux(pvs=None): if not pvs: return ret={} val = epics.caget_many(pvs) for i,pv in enumerate(pvs): if val[i]: # filter out None values ret[pv]=float(val[i]) epics.ca.clear_cache() return ret def getBSChannels(regexp): prog = re.compile(regexp) res = [] for bs in dispatcher.get_current_channels(): if prog.match(bs['name']): res.append(bs['name']) return res class Dispersion: def __init__(self, branch = 'Aramis'): self.scanname = 'Dispersion' self.branch = 'None' dirpath= datetime.datetime.now().strftime('measurements/%Y/%m/%d/slic_sfbd') self.basedir = '/sf/data/%s/%s' % (dirpath,self.scanname) self.pgroup='%s/%s' % (dirpath,self.scanname) self.scandir='/sf/data/'+self.pgroup self.setBranch(branch) self.sc = None self.Nsteps = 2 self.Nsamples = 1 def setBranch(self,branch = 'Aramis'): self.sc = None if branch == 'Athos Dump': self.setupAthosDump() elif branch == 'Aramis': self.setupAramis() elif branch == 'Athos': self.setupAthos() else: self.scanner = None return # path is define in the various set-up procedures self.scanner = Scanner(data_base_dir=self.basedir,scan_info_dir=self.basedir, make_scan_sub_dir=True, default_acquisitions=self.acq) def getRFCalibrationChannels(self,sensor2,energy): aux=[] for sen in sensor2: if 'PHASE-VS' in sen: aux.append(sen.replace('PHASE-VS','GET-VSUM-PHASE-OFFSET').replace('RLLE-DSP','RSYS')) aux.append(sen.replace('PHASE-VS','GET-VSUM-AMPLT-SCALE').replace('RLLE-DSP','RSYS')) aux.append(sen.replace('PHASE-VS','SM-GET').replace('RLLE-DSP','RMSM')) aux.append(energy) return aux def setupAthosDump(self): self.branch='Athos_Dump' pgroup = '%s-%s' % (self.pgroup,self.branch) self.path = '/sf/data/'+pgroup # pre-scan item - needs an adjustment of the dipole current of about 2.3e-3 - to be implemented later. self.pre = {} self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0} self.pre['SFB_ORBIT_SAT:ONOFF1']={'Val':0,'InitVal':0} for i in range(1,5): self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0} for pv in self.pre.keys(): self.pre[pv]['adj']=PVAdjustable(pv) # adjustable self.adjSV = 'SATCB01-RSYS:SET-BEAM-PHASE' self.adjRB = 'SATCB01-RSYS:GET-BEAM-PHASE' self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1,ID = self.adjSV, name = "SATCB01-Linac") self.amp = 30 # the amplitude of the scan, which can be scaled # acquisition sensor1 = getBSChannels('SATBD02-DBPM.*:Y2$') sensor2 = getBSChannels('SATCB.*-RLLE-DSP:.*-VS$') self.sensor = sensor1+sensor2 self.acq = [BSAcquisition(".",pgroup, default_channels=self.sensor)] # auxiliar data to be read one self.aux = self.getRFCalibrationChannels(sensor2,'SATCL01-MBND100:ENERGY-OP') def setupAramis(self): self.branch='Aramis' pgroup = '%s-%s' % (self.pgroup,self.branch) self.path = '/sf/data/'+pgroup # pre-scan item self.pre = {} self.pre['SFB_BEAM_DUMP_AR:ONOFF1']={'Val':0,'InitVal':0} self.pre['SFB_BEAM_ENERGY_ECOL_AR:ONOFF1']={'Val':0,'InitVal':0} self.pre['SFB_ORBIT_S30:ONOFF1']={'Val':0,'InitVal':0} self.pre['SFB_ORBIT_SAR:ONOFF1']={'Val':0,'InitVal':0} for pv in self.pre.keys(): self.pre[pv]['adj']=PVAdjustable(pv) # adjustable self.adjSV = 'S30:SET-E-GAIN-OP' self.adjRB = 'S30:GET-E-GAIN-OP' self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1,ID = self.adjSV, name = "Linac3") self.amp = 20 # the amplitude of the scan, which can be scaled # acquisition sensor1 = getBSChannels('SAR.*DBPM.*:[XY]1$') sensor2 = getBSChannels('S[23].*-RLLE-DSP:.*-VS$') self.sensor = sensor1+sensor2 self.acq = [BSAcquisition(".",pgroup, default_channels=self.sensor)] # auxiliar data to be read one self.aux = self.getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP') def setupAthos(self): self.branch='Aramis' pgroup = '%s-%s' % (self.pgroup,self.branch) self.path = '/sf/data/'+pgroup # pre-scan item self.pre = {} self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0} self.pre['SFB_BEAM_ENERGY_ECOL_AT:ONOFF1']={'Val':0,'InitVal':0} self.pre['SFB_ORBIT_SWY:ONOFF1']={'Val':0,'InitVal':0} self.pre['SFB_ORBIT_SAT:ONOFF1']={'Val':0,'InitVal':0} for i in range(1,5): self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0} for pv in self.pre.keys(): self.pre[pv]['adj']=PVAdjustable(pv) # adjustable self.adjSV = 'S20:SET-E-GAIN-OP' self.adjRB = 'S20:GET-E-GAIN-OP' self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1,ID = self.adjSV, name = "Linac 2 and 3") # self.adj2SV = 'S30:SET-E-GAIN-OP' # self.adj2RB = 'S30:GET-E-GAIN-OP' # self.adj2 = PVAdjustable(self.adj2SV,pvname_readback = self.adj2RB, accuracy = 0.1) # self.adj = CounterAdjustable(self.adj1,self.adj2) # combine the two channels self.amp = 10 # the amplitude of the scan, which can be scaled # acquisition sensor1 = getBSChannels('SAT[SDC].*DBPM.*:[XY]2$') sensor2 = getBSChannels('S[2].*-RLLE-DSP:.*-VS$') self.sensor = sensor1+sensor2 self.acq = [BSAcquisition(".",pgroup, default_channels=self.sensor)] # auxiliar data to be read one self.aux = self.getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP') def setup(self,scl = 1, Nsteps=5, Nsamples=5): val = self.adj.get_current_value(readback=False) # dval = self.amp*scl dval = 0 ######## edit this self.N = Nsteps self.Ns= Nsamples self.values=np.linspace(val-dval,val+dval,num=self.N) def preaction(self): for key in self.pre.keys(): self.pre[key]['InitVal'] = self.pre[key]['adj'].get_current_value(readback = False) self.pre[key]['adj'].set_target_value(self.pre[key]['Val']) def postaction(self): for key in self.pre.keys(): self.pre[key]['adj'].set_target_value(self.pre[key]['InitVal']) def scan(self): self.sc=self.scanner.ascan_list(self.adj,self.values, filename=self.branch,start_immediately = False, n_pulses=self.Ns,return_to_initial_values=True) # self.preaction() ###### self.sc.run() # self.auxdata = getAux(self.aux) # self.postaction() ####### def stop(self): if self.sc is None: return self.sc.stop() def running(self): return self.sc.running def status(self): si = self.sc.scan_info.to_dict() steps = 0 if 'scan_values' in si: steps=len(si['scan_values']) return steps,self.N def info(self): return self.sc.scan_info.to_dict()