317 lines
12 KiB
Python
317 lines
12 KiB
Python
import datetime
|
|
import re
|
|
import numpy as np
|
|
|
|
from bsread import dispatcher
|
|
import epics
|
|
|
|
from slic.core.adjustable import PVAdjustable
|
|
from slic.core.acquisition import BSAcquisition, PVAcquisition
|
|
from slic.core.scanner import Scanner
|
|
from sfbd.ext import CounterAdjustable
|
|
from sfbd.ext import BSCAcquisition
|
|
from sfbd.interface import SlicScan
|
|
from bstrd import BSCache
|
|
|
|
def getAux(pvs=None):
|
|
if not pvs:
|
|
return
|
|
ret={}
|
|
val = epics.caget_many(pvs)
|
|
for i,pv in enumerate(pvs):
|
|
if val[i]: # filter out None values
|
|
ret[pv]=float(val[i])
|
|
return ret
|
|
|
|
|
|
def getBSChannels(regexp):
|
|
prog = re.compile(regexp)
|
|
res = []
|
|
for bs in dispatcher.get_current_channels():
|
|
if prog.match(bs['name']):
|
|
res.append(bs['name'])
|
|
return res
|
|
|
|
def getRFCalibrationChannels(sensors,energy):
|
|
aux=[]
|
|
for sen in sensors:
|
|
if 'PHASE-VS' in sen:
|
|
aux.append(sen.replace('PHASE-VS','GET-VSUM-PHASE-OFFSET').replace('RLLE-DSP','RSYS'))
|
|
aux.append(sen.replace('PHASE-VS','GET-VSUM-AMPLT-SCALE').replace('RLLE-DSP','RSYS'))
|
|
aux.append(sen.replace('PHASE-VS','SM-GET').replace('RLLE-DSP','RMSM'))
|
|
aux.append(energy)
|
|
return aux
|
|
|
|
|
|
class Dispersion:
|
|
def __init__(self, branch = 'Aramis'):
|
|
self.scanname = 'Dispersion'
|
|
self.branch = None
|
|
|
|
dirpath= datetime.datetime.now().strftime('measurements/%Y/%m/%d/slic_sfbd')
|
|
self.basedir = '/sf/data/%s/%s' % (dirpath,self.scanname)
|
|
self.pgroup='%s/%s' % (dirpath,self.scanname)
|
|
self.scandir='/sf/data/'+self.pgroup
|
|
|
|
self.setBranch(branch) # enfore Athos dump settings for now
|
|
self.Nsteps = 2
|
|
self.Nsamples = 1
|
|
self.bsc = None
|
|
|
|
def setBranch(self,branch = 'Aramis'):
|
|
self.sc = None
|
|
if branch == 'Athos Dump':
|
|
self.setupAthosDump()
|
|
elif branch == 'Aramis':
|
|
self.setupAramis()
|
|
elif branch == 'Athos':
|
|
self.setupAthos()
|
|
elif branch =='Bunch Compressor 2':
|
|
self.setupBC2()
|
|
else:
|
|
self.branch = None
|
|
return self.branch
|
|
|
|
def setup(self,scl = 1, Nsteps=5, Nsamples=10):
|
|
# the setup is done from the main thread. Therefore I have to define the BSC here
|
|
self.N = Nsteps
|
|
self.Ns= Nsamples
|
|
self.scale = scl
|
|
# define stream
|
|
print('Getting BSCache')
|
|
self.bsc = BSCache(100000, receive_timeout=10000) # 1000 second timeout, size for 100 second data taken
|
|
self.bsc.get_vars(self.sensor) # this starts the stream into the cache
|
|
print('Getting BSCache done')
|
|
|
|
def scan(self):
|
|
# core routine to do all action. Will be a thread of the slic scanner wrapper
|
|
if not self.branch:
|
|
return
|
|
|
|
# define acquisition channels wrapper for BSQueue
|
|
pgroup = '%s-%s' % (self.pgroup,self.branch)
|
|
acq = [BSCAcquisition(".",pgroup, default_channels=[self.bsc])]
|
|
|
|
# define the scanner
|
|
self.scanner = Scanner(data_base_dir=self.basedir, # root directory of data location e.g. /sf/data/2023/02/02/Dispersion
|
|
scan_info_dir=self.basedir, # write also scan info there
|
|
make_scan_sub_dir=True, # put each scan in its own directory
|
|
default_acquisitions=acq) # adjustable to use
|
|
|
|
# define adjustable
|
|
self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1,ID = self.adjSV, name = self.name)
|
|
val = self.adj.get_current_value(readback=False)
|
|
if self.adj2SV:
|
|
self.adj2 = PVAdjustable(self.adj2SV,pvname_readback = self.adj2RB, accuracy = 0.1,ID = self.adjSV, name = self.name)
|
|
val2 = self.adj2.get_current_value(readback=False)
|
|
dval = self.amp*self.scale
|
|
print(self.adjSV,' - Scan Range:',val-dval,'to',val+dval)
|
|
if self.adj2SV:
|
|
print(self.adj2SV,' - Scan Range:',val2+dval,'to',val2-dval)
|
|
|
|
# create scanner backend
|
|
if self.adj2SV:
|
|
self.sc=self.scanner.a2scan(self.adj, val-dval, val+dval,
|
|
self.adj2, val2+dval, val2-dval,
|
|
n_intervals = self.N-1, # steps
|
|
n_pulses = self.Ns, # samples
|
|
filename=self.branch, # relative directory for data
|
|
start_immediately = False, # wait for execution to performe pre-action items
|
|
return_to_initial_values=True) # return to initial values
|
|
else:
|
|
self.sc=self.scanner.ascan(self.adj, val-dval, val+dval,
|
|
n_intervals = self.N-1, # steps
|
|
n_pulses = self.Ns, # samples
|
|
filename=self.branch, # relative directory for data
|
|
start_immediately = False, # wait for execution to performe pre-action items
|
|
return_to_initial_values=True) # return to initial values
|
|
|
|
# get aux data first
|
|
self.auxdata = getAux(self.aux)
|
|
self.preaction()
|
|
self.sc.run()
|
|
self.postaction()
|
|
self.bsc.stop() # stop the queue
|
|
|
|
|
|
|
|
#################################
|
|
# definition of the individual branches
|
|
|
|
def setupBC2(self):
|
|
# branch and name tag
|
|
self.branch='Bunch Compressor 2'
|
|
self.name ='BC2'
|
|
|
|
# pre-scan item - needs an adjustment of the dipole current of about 2.3e-3 - to be implemented later.
|
|
self.pre = {}
|
|
self.pre['SFB_COMPRESSION_BC2_AR:ONOFF1']={'Val':0,'InitVal':0}
|
|
self.pre['SFB_COMPRESSION_BC2_AR:ONOFF2']={'Val':0,'InitVal':0}
|
|
self.pre['SFB_ORBIT_S10:ONOFF1']={'Val':0,'InitVal':0}
|
|
for pv in self.pre.keys():
|
|
self.pre[pv]['adj']=PVAdjustable(pv)
|
|
|
|
# adjustable
|
|
self.adjSV = 'S10:SET-E-GAIN-OP'
|
|
self.adjRB = 'S10:GET-E-GAIN-OP'
|
|
self.adj2SV = 'S20:SET-E-GAIN-OP' # counter adjustable
|
|
self.adj2RB = 'S20:GET-E-GAIN-OP'
|
|
self.amp = 5 # the amplitude of the scan, which can be scaled
|
|
|
|
|
|
# acquisition
|
|
sensor1 = getBSChannels('S10[BM].*-DBPM.*:[XY]1$')
|
|
sensor2 = getBSChannels('S1.*-RLLE-DSP:.*-VS$')
|
|
self.sensor = sensor1+sensor2
|
|
|
|
# auxiliar data to be read one
|
|
self.aux = getRFCalibrationChannels(sensor2,'SINBC02-MBND100:ENERGY-OP')
|
|
|
|
def setupAthosDump(self):
|
|
# branch and name tag
|
|
self.branch='Athos_Dump'
|
|
self.name ='SATCB01-Linac'
|
|
|
|
# pre-scan item - needs an adjustment of the dipole current of about 2.3e-3 - to be implemented later.
|
|
self.pre = {}
|
|
self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0}
|
|
self.pre['SFB_ORBIT_SAT:ONOFF1']={'Val':0,'InitVal':0}
|
|
for i in range(1,5):
|
|
self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0}
|
|
for pv in self.pre.keys():
|
|
self.pre[pv]['adj']=PVAdjustable(pv)
|
|
|
|
# adjustable
|
|
self.adjSV = 'SATCB01-RSYS:SET-BEAM-PHASE'
|
|
self.adjRB = 'SATCB01-RSYS:GET-BEAM-PHASE'
|
|
self.adj2SV = None
|
|
self.adj2RB = None
|
|
self.amp = 30 # the amplitude of the scan, which can be scaled
|
|
|
|
# acquisition
|
|
sensor1 = getBSChannels('SATBD02-DBPM.*:Y2$')
|
|
sensor2 = getBSChannels('SATCB.*-RLLE-DSP:.*-VS$')
|
|
self.sensor = sensor1+sensor2
|
|
|
|
# auxiliar data to be read one
|
|
self.aux = getRFCalibrationChannels(sensor2,'SATCL01-MBND100:ENERGY-OP')
|
|
|
|
def setupAramis(self):
|
|
# branch and name tag
|
|
self.branch='Aramis'
|
|
self.name = 'Linac3'
|
|
|
|
# pre-scan item
|
|
self.pre = {}
|
|
self.pre['SFB_BEAM_DUMP_AR:ONOFF1']={'Val':0,'InitVal':0}
|
|
self.pre['SFB_BEAM_ENERGY_ECOL_AR:ONOFF1']={'Val':0,'InitVal':0}
|
|
self.pre['SFB_ORBIT_S30:ONOFF1']={'Val':0,'InitVal':0}
|
|
self.pre['SFB_ORBIT_SAR:ONOFF1']={'Val':0,'InitVal':0}
|
|
for pv in self.pre.keys():
|
|
self.pre[pv]['adj']=PVAdjustable(pv)
|
|
|
|
# adjustable
|
|
self.adjSV = 'S30:SET-E-GAIN-OP'
|
|
self.adjRB = 'S30:GET-E-GAIN-OP'
|
|
self.adj2SV = None
|
|
self.adj2RB = None
|
|
self.amp = 20 # the amplitude of the scan, which can be scaled
|
|
|
|
# acquisition
|
|
sensor1 = getBSChannels('SAR[CMU].*DBPM.*:[XY]1$')
|
|
sensor2 = getBSChannels('S[23].*-RLLE-DSP:.*-VS$')
|
|
self.sensor = sensor1+sensor2
|
|
|
|
# auxiliar data to be read one
|
|
self.aux = getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP')
|
|
|
|
|
|
def setupAthos(self):
|
|
# branch and name tag
|
|
self.branch='Athos'
|
|
self.name = 'Linac2+3'
|
|
|
|
# pre-scan item
|
|
self.pre = {}
|
|
self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0}
|
|
self.pre['SFB_BEAM_ENERGY_ECOL_AT:ONOFF1']={'Val':0,'InitVal':0}
|
|
self.pre['SFB_ORBIT_SWY:ONOFF1']={'Val':0,'InitVal':0}
|
|
self.pre['SFB_ORBIT_SAT:ONOFF1']={'Val':0,'InitVal':0}
|
|
for i in range(1,5):
|
|
self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0}
|
|
for pv in self.pre.keys():
|
|
self.pre[pv]['adj']=PVAdjustable(pv)
|
|
|
|
# adjustable
|
|
self.adjSV = 'S20:SET-E-GAIN-OP'
|
|
self.adjRB = 'S20:GET-E-GAIN-OP'
|
|
self.adj2SV = 'S30:SET-E-GAIN-OP' # counter adjustable
|
|
self.adj2RB = 'S30:GET-E-GAIN-OP'
|
|
self.amp = 5 # the amplitude of the scan, which can be scaled
|
|
|
|
# acquisition
|
|
sensor1 = getBSChannels('SAT[SDC].*DBPM.*:[XY]2$')
|
|
sensor2 = getBSChannels('S[2].*-RLLE-DSP:.*-VS$')
|
|
self.sensor = sensor1+sensor2
|
|
|
|
# auxiliar data to be read one
|
|
self.aux = getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP')
|
|
|
|
|
|
#########################
|
|
# some basic interaction with the scan functionality
|
|
|
|
|
|
def preaction(self):
|
|
for key in self.pre.keys():
|
|
self.pre[key]['InitVal'] = self.pre[key]['adj'].get_current_value(readback = False)
|
|
self.pre[key]['adj'].set_target_value(self.pre[key]['Val'])
|
|
|
|
def postaction(self):
|
|
for key in self.pre.keys():
|
|
self.pre[key]['adj'].set_target_value(self.pre[key]['InitVal'])
|
|
|
|
def stop(self):
|
|
if self.sc is None:
|
|
return
|
|
self.sc.stop()
|
|
|
|
def running(self):
|
|
if self.sc is None:
|
|
return False
|
|
return self.sc.running
|
|
|
|
def status(self):
|
|
if self.sc is None:
|
|
return 0,0
|
|
si = self.sc.scan_info.to_dict()
|
|
steps = 0
|
|
if 'scan_values' in si:
|
|
steps=len(si['scan_values'])
|
|
return steps,self.N
|
|
|
|
def info(self):
|
|
if self.sc is None:
|
|
return None
|
|
return self.sc.scan_info.to_dict()
|
|
|
|
|
|
#--------------------
|
|
# main implementation for debugging
|
|
|
|
|
|
if __name__ == '__main__':
|
|
daq = Dispersion()
|
|
daq.setup(1,10,100)
|
|
# threaded execution
|
|
scanner=SlicScan()
|
|
scanner.start(daq,True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|