import datetime import re import numpy as np from bsread import dispatcher import epics from slic.core.adjustable import PVAdjustable from slic.core.acquisition import BSAcquisition, PVAcquisition from slic.core.scanner import Scanner from sfbd.ext import CounterAdjustable from sfbd.ext import BSCAcquisition from sfbd.interface import SlicScan from bstrd import BSCache def getAux(pvs=None): if not pvs: return ret={} val = epics.caget_many(pvs) for i,pv in enumerate(pvs): if val[i]: # filter out None values ret[pv]=float(val[i]) return ret def getBSChannels(regexp): prog = re.compile(regexp) res = [] for bs in dispatcher.get_current_channels(): if prog.match(bs['name']): res.append(bs['name']) return res def getRFCalibrationChannels(sensors,energy): aux=[] for sen in sensors: if 'PHASE-VS' in sen: aux.append(sen.replace('PHASE-VS','GET-VSUM-PHASE-OFFSET').replace('RLLE-DSP','RSYS')) aux.append(sen.replace('PHASE-VS','GET-VSUM-AMPLT-SCALE').replace('RLLE-DSP','RSYS')) aux.append(sen.replace('PHASE-VS','SM-GET').replace('RLLE-DSP','RMSM')) aux.append(energy) return aux class Dispersion: def __init__(self, branch = 'Aramis'): self.scanname = 'Dispersion' self.branch = None dirpath= datetime.datetime.now().strftime('measurements/%Y/%m/%d/slic_sfbd') self.basedir = '/sf/data/%s/%s' % (dirpath,self.scanname) self.pgroup='%s/%s' % (dirpath,self.scanname) self.scandir='/sf/data/'+self.pgroup self.setBranch(branch) # enfore Athos dump settings for now self.Nsteps = 2 self.Nsamples = 1 self.bsc = None def setBranch(self,branch = 'Aramis'): self.sc = None if branch == 'Athos Dump': self.setupAthosDump() elif branch == 'Aramis': self.setupAramis() elif branch == 'Athos': self.setupAthos() elif branch =='Bunch Compressor 2': self.setupBC2() else: self.branch = None return self.branch def setup(self,scl = 1, Nsteps=5, Nsamples=10): # the setup is done from the main thread. Therefore I have to define the BSC here self.N = Nsteps self.Ns= Nsamples self.scale = scl # define stream print('Getting BSCache') self.bsc = BSCache(100000, receive_timeout=10000) # 1000 second timeout, size for 100 second data taken self.bsc.get_vars(self.sensor) # this starts the stream into the cache print('Getting BSCache done') def scan(self): # core routine to do all action. Will be a thread of the slic scanner wrapper if not self.branch: return # define acquisition channels wrapper for BSQueue pgroup = '%s-%s' % (self.pgroup,self.branch) acq = [BSCAcquisition(".",pgroup, default_channels=[self.bsc])] # define the scanner self.scanner = Scanner(data_base_dir=self.basedir, # root directory of data location e.g. /sf/data/2023/02/02/Dispersion scan_info_dir=self.basedir, # write also scan info there make_scan_sub_dir=True, # put each scan in its own directory default_acquisitions=acq) # adjustable to use # define adjustable self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1,ID = self.adjSV, name = self.name) val = self.adj.get_current_value(readback=False) if self.adj2SV: self.adj2 = PVAdjustable(self.adj2SV,pvname_readback = self.adj2RB, accuracy = 0.1,ID = self.adjSV, name = self.name) val2 = self.adj2.get_current_value(readback=False) dval = self.amp*self.scale print(self.adjSV,' - Scan Range:',val-dval,'to',val+dval) if self.adj2SV: print(self.adj2SV,' - Scan Range:',val2+dval,'to',val2-dval) # create scanner backend if self.adj2SV: self.sc=self.scanner.a2scan(self.adj, val-dval, val+dval, self.adj2, val2+dval, val2-dval, n_intervals = self.N-1, # steps n_pulses = self.Ns, # samples filename=self.branch, # relative directory for data start_immediately = False, # wait for execution to performe pre-action items return_to_initial_values=True) # return to initial values else: self.sc=self.scanner.ascan(self.adj, val-dval, val+dval, n_intervals = self.N-1, # steps n_pulses = self.Ns, # samples filename=self.branch, # relative directory for data start_immediately = False, # wait for execution to performe pre-action items return_to_initial_values=True) # return to initial values # get aux data first self.auxdata = getAux(self.aux) self.preaction() self.sc.run() self.postaction() self.bsc.stop() # stop the queue ################################# # definition of the individual branches def setupBC2(self): # branch and name tag self.branch='Bunch Compressor 2' self.name ='BC2' # pre-scan item - needs an adjustment of the dipole current of about 2.3e-3 - to be implemented later. self.pre = {} self.pre['SFB_COMPRESSION_BC2_AR:ONOFF1']={'Val':0,'InitVal':0} self.pre['SFB_COMPRESSION_BC2_AR:ONOFF2']={'Val':0,'InitVal':0} self.pre['SFB_ORBIT_S10:ONOFF1']={'Val':0,'InitVal':0} for pv in self.pre.keys(): self.pre[pv]['adj']=PVAdjustable(pv) # adjustable self.adjSV = 'S10:SET-E-GAIN-OP' self.adjRB = 'S10:GET-E-GAIN-OP' self.adj2SV = 'S20:SET-E-GAIN-OP' # counter adjustable self.adj2RB = 'S20:GET-E-GAIN-OP' self.amp = 5 # the amplitude of the scan, which can be scaled # acquisition sensor1 = getBSChannels('S10[BM].*-DBPM.*:[XY]1$') sensor2 = getBSChannels('S1.*-RLLE-DSP:.*-VS$') self.sensor = sensor1+sensor2 # auxiliar data to be read one self.aux = getRFCalibrationChannels(sensor2,'SINBC02-MBND100:ENERGY-OP') def setupAthosDump(self): # branch and name tag self.branch='Athos_Dump' self.name ='SATCB01-Linac' # pre-scan item - needs an adjustment of the dipole current of about 2.3e-3 - to be implemented later. self.pre = {} self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0} self.pre['SFB_ORBIT_SAT:ONOFF1']={'Val':0,'InitVal':0} for i in range(1,5): self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0} for pv in self.pre.keys(): self.pre[pv]['adj']=PVAdjustable(pv) # adjustable self.adjSV = 'SATCB01-RSYS:SET-BEAM-PHASE' self.adjRB = 'SATCB01-RSYS:GET-BEAM-PHASE' self.adj2SV = None self.adj2RB = None self.amp = 30 # the amplitude of the scan, which can be scaled # acquisition sensor1 = getBSChannels('SATBD02-DBPM.*:Y2$') sensor2 = getBSChannels('SATCB.*-RLLE-DSP:.*-VS$') self.sensor = sensor1+sensor2 # auxiliar data to be read one self.aux = getRFCalibrationChannels(sensor2,'SATCL01-MBND100:ENERGY-OP') def setupAramis(self): # branch and name tag self.branch='Aramis' self.name = 'Linac3' # pre-scan item self.pre = {} self.pre['SFB_BEAM_DUMP_AR:ONOFF1']={'Val':0,'InitVal':0} self.pre['SFB_BEAM_ENERGY_ECOL_AR:ONOFF1']={'Val':0,'InitVal':0} self.pre['SFB_ORBIT_S30:ONOFF1']={'Val':0,'InitVal':0} self.pre['SFB_ORBIT_SAR:ONOFF1']={'Val':0,'InitVal':0} for pv in self.pre.keys(): self.pre[pv]['adj']=PVAdjustable(pv) # adjustable self.adjSV = 'S30:SET-E-GAIN-OP' self.adjRB = 'S30:GET-E-GAIN-OP' self.adj2SV = None self.adj2RB = None self.amp = 20 # the amplitude of the scan, which can be scaled # acquisition sensor1 = getBSChannels('SAR[CMU].*DBPM.*:[XY]1$') sensor2 = getBSChannels('S[23].*-RLLE-DSP:.*-VS$') self.sensor = sensor1+sensor2 # auxiliar data to be read one self.aux = getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP') def setupAthos(self): # branch and name tag self.branch='Athos' self.name = 'Linac2+3' # pre-scan item self.pre = {} self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0} self.pre['SFB_BEAM_ENERGY_ECOL_AT:ONOFF1']={'Val':0,'InitVal':0} self.pre['SFB_ORBIT_SWY:ONOFF1']={'Val':0,'InitVal':0} self.pre['SFB_ORBIT_SAT:ONOFF1']={'Val':0,'InitVal':0} for i in range(1,5): self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0} for pv in self.pre.keys(): self.pre[pv]['adj']=PVAdjustable(pv) # adjustable self.adjSV = 'S20:SET-E-GAIN-OP' self.adjRB = 'S20:GET-E-GAIN-OP' self.adj2SV = 'S30:SET-E-GAIN-OP' # counter adjustable self.adj2RB = 'S30:GET-E-GAIN-OP' self.amp = 5 # the amplitude of the scan, which can be scaled # acquisition sensor1 = getBSChannels('SAT[SDC].*DBPM.*:[XY]2$') sensor2 = getBSChannels('S[2].*-RLLE-DSP:.*-VS$') self.sensor = sensor1+sensor2 # auxiliar data to be read one self.aux = getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP') ######################### # some basic interaction with the scan functionality def preaction(self): for key in self.pre.keys(): self.pre[key]['InitVal'] = self.pre[key]['adj'].get_current_value(readback = False) self.pre[key]['adj'].set_target_value(self.pre[key]['Val']) def postaction(self): for key in self.pre.keys(): self.pre[key]['adj'].set_target_value(self.pre[key]['InitVal']) def stop(self): if self.sc is None: return self.sc.stop() def running(self): if self.sc is None: return False return self.sc.running def status(self): if self.sc is None: return 0,0 si = self.sc.scan_info.to_dict() steps = 0 if 'scan_values' in si: steps=len(si['scan_values']) return steps,self.N def info(self): if self.sc is None: return None return self.sc.scan_info.to_dict() #-------------------- # main implementation for debugging if __name__ == '__main__': daq = Dispersion() daq.setup(1,10,100) # threaded execution scanner=SlicScan() scanner.start(daq,True)