Files
sfbd/app/dispersiontools.py

317 lines
12 KiB
Python

import datetime
import re
import numpy as np
from bsread import dispatcher
import epics
from slic.core.adjustable import PVAdjustable
from slic.core.acquisition import BSAcquisition, PVAcquisition
from slic.core.scanner import Scanner
from sfbd.ext import CounterAdjustable
from sfbd.ext import BSCAcquisition
from sfbd.interface import SlicScan
from bstrd import BSCache
def getAux(pvs=None):
if not pvs:
return
ret={}
val = epics.caget_many(pvs)
for i,pv in enumerate(pvs):
if val[i]: # filter out None values
ret[pv]=float(val[i])
return ret
def getBSChannels(regexp):
prog = re.compile(regexp)
res = []
for bs in dispatcher.get_current_channels():
if prog.match(bs['name']):
res.append(bs['name'])
return res
def getRFCalibrationChannels(sensors,energy):
aux=[]
for sen in sensors:
if 'PHASE-VS' in sen:
aux.append(sen.replace('PHASE-VS','GET-VSUM-PHASE-OFFSET').replace('RLLE-DSP','RSYS'))
aux.append(sen.replace('PHASE-VS','GET-VSUM-AMPLT-SCALE').replace('RLLE-DSP','RSYS'))
aux.append(sen.replace('PHASE-VS','SM-GET').replace('RLLE-DSP','RMSM'))
aux.append(energy)
return aux
class Dispersion:
def __init__(self, branch = 'Aramis'):
self.scanname = 'Dispersion'
self.branch = None
dirpath= datetime.datetime.now().strftime('measurements/%Y/%m/%d/slic_sfbd')
self.basedir = '/sf/data/%s/%s' % (dirpath,self.scanname)
self.pgroup='%s/%s' % (dirpath,self.scanname)
self.scandir='/sf/data/'+self.pgroup
self.setBranch(branch) # enfore Athos dump settings for now
self.Nsteps = 2
self.Nsamples = 1
self.bsc = None
def setBranch(self,branch = 'Aramis'):
self.sc = None
if branch == 'Athos Dump':
self.setupAthosDump()
elif branch == 'Aramis':
self.setupAramis()
elif branch == 'Athos':
self.setupAthos()
elif branch =='Bunch Compressor 2':
self.setupBC2()
else:
self.branch = None
return self.branch
def setup(self,scl = 1, Nsteps=5, Nsamples=10):
# the setup is done from the main thread. Therefore I have to define the BSC here
self.N = Nsteps
self.Ns= Nsamples
self.scale = scl
# define stream
print('Getting BSCache')
self.bsc = BSCache(100000, receive_timeout=10000) # 1000 second timeout, size for 100 second data taken
self.bsc.get_vars(self.sensor) # this starts the stream into the cache
print('Getting BSCache done')
def scan(self):
# core routine to do all action. Will be a thread of the slic scanner wrapper
if not self.branch:
return
# define acquisition channels wrapper for BSQueue
pgroup = '%s-%s' % (self.pgroup,self.branch)
acq = [BSCAcquisition(".",pgroup, default_channels=[self.bsc])]
# define the scanner
self.scanner = Scanner(data_base_dir=self.basedir, # root directory of data location e.g. /sf/data/2023/02/02/Dispersion
scan_info_dir=self.basedir, # write also scan info there
make_scan_sub_dir=True, # put each scan in its own directory
default_acquisitions=acq) # adjustable to use
# define adjustable
self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1,ID = self.adjSV, name = self.name)
val = self.adj.get_current_value(readback=False)
if self.adj2SV:
self.adj2 = PVAdjustable(self.adj2SV,pvname_readback = self.adj2RB, accuracy = 0.1,ID = self.adjSV, name = self.name)
val2 = self.adj2.get_current_value(readback=False)
dval = self.amp*self.scale
print(self.adjSV,' - Scan Range:',val-dval,'to',val+dval)
if self.adj2SV:
print(self.adj2SV,' - Scan Range:',val2+dval,'to',val2-dval)
# create scanner backend
if self.adj2SV:
self.sc=self.scanner.a2scan(self.adj, val-dval, val+dval,
self.adj2, val2+dval, val2-dval,
n_intervals = self.N-1, # steps
n_pulses = self.Ns, # samples
filename=self.branch, # relative directory for data
start_immediately = False, # wait for execution to performe pre-action items
return_to_initial_values=True) # return to initial values
else:
self.sc=self.scanner.ascan(self.adj, val-dval, val+dval,
n_intervals = self.N-1, # steps
n_pulses = self.Ns, # samples
filename=self.branch, # relative directory for data
start_immediately = False, # wait for execution to performe pre-action items
return_to_initial_values=True) # return to initial values
# get aux data first
self.auxdata = getAux(self.aux)
self.preaction()
self.sc.run()
self.postaction()
self.bsc.stop() # stop the queue
#################################
# definition of the individual branches
def setupBC2(self):
# branch and name tag
self.branch='Bunch Compressor 2'
self.name ='BC2'
# pre-scan item - needs an adjustment of the dipole current of about 2.3e-3 - to be implemented later.
self.pre = {}
self.pre['SFB_COMPRESSION_BC2_AR:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_COMPRESSION_BC2_AR:ONOFF2']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_S10:ONOFF1']={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'S10:SET-E-GAIN-OP'
self.adjRB = 'S10:GET-E-GAIN-OP'
self.adj2SV = 'S20:SET-E-GAIN-OP' # counter adjustable
self.adj2RB = 'S20:GET-E-GAIN-OP'
self.amp = 5 # the amplitude of the scan, which can be scaled
# acquisition
sensor1 = getBSChannels('S10[BM].*-DBPM.*:[XY]1$')
sensor2 = getBSChannels('S1.*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
# auxiliar data to be read one
self.aux = getRFCalibrationChannels(sensor2,'SINBC02-MBND100:ENERGY-OP')
def setupAthosDump(self):
# branch and name tag
self.branch='Athos_Dump'
self.name ='SATCB01-Linac'
# pre-scan item - needs an adjustment of the dipole current of about 2.3e-3 - to be implemented later.
self.pre = {}
self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_SAT:ONOFF1']={'Val':0,'InitVal':0}
for i in range(1,5):
self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'SATCB01-RSYS:SET-BEAM-PHASE'
self.adjRB = 'SATCB01-RSYS:GET-BEAM-PHASE'
self.adj2SV = None
self.adj2RB = None
self.amp = 30 # the amplitude of the scan, which can be scaled
# acquisition
sensor1 = getBSChannels('SATBD02-DBPM.*:Y2$')
sensor2 = getBSChannels('SATCB.*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
# auxiliar data to be read one
self.aux = getRFCalibrationChannels(sensor2,'SATCL01-MBND100:ENERGY-OP')
def setupAramis(self):
# branch and name tag
self.branch='Aramis'
self.name = 'Linac3'
# pre-scan item
self.pre = {}
self.pre['SFB_BEAM_DUMP_AR:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_BEAM_ENERGY_ECOL_AR:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_S30:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_SAR:ONOFF1']={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'S30:SET-E-GAIN-OP'
self.adjRB = 'S30:GET-E-GAIN-OP'
self.adj2SV = None
self.adj2RB = None
self.amp = 20 # the amplitude of the scan, which can be scaled
# acquisition
sensor1 = getBSChannels('SAR[CMU].*DBPM.*:[XY]1$')
sensor2 = getBSChannels('S[23].*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
# auxiliar data to be read one
self.aux = getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP')
def setupAthos(self):
# branch and name tag
self.branch='Athos'
self.name = 'Linac2+3'
# pre-scan item
self.pre = {}
self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_BEAM_ENERGY_ECOL_AT:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_SWY:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_SAT:ONOFF1']={'Val':0,'InitVal':0}
for i in range(1,5):
self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'S20:SET-E-GAIN-OP'
self.adjRB = 'S20:GET-E-GAIN-OP'
self.adj2SV = 'S30:SET-E-GAIN-OP' # counter adjustable
self.adj2RB = 'S30:GET-E-GAIN-OP'
self.amp = 5 # the amplitude of the scan, which can be scaled
# acquisition
sensor1 = getBSChannels('SAT[SDC].*DBPM.*:[XY]2$')
sensor2 = getBSChannels('S[2].*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
# auxiliar data to be read one
self.aux = getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP')
#########################
# some basic interaction with the scan functionality
def preaction(self):
for key in self.pre.keys():
self.pre[key]['InitVal'] = self.pre[key]['adj'].get_current_value(readback = False)
self.pre[key]['adj'].set_target_value(self.pre[key]['Val'])
def postaction(self):
for key in self.pre.keys():
self.pre[key]['adj'].set_target_value(self.pre[key]['InitVal'])
def stop(self):
if self.sc is None:
return
self.sc.stop()
def running(self):
if self.sc is None:
return False
return self.sc.running
def status(self):
if self.sc is None:
return 0,0
si = self.sc.scan_info.to_dict()
steps = 0
if 'scan_values' in si:
steps=len(si['scan_values'])
return steps,self.N
def info(self):
if self.sc is None:
return None
return self.sc.scan_info.to_dict()
#--------------------
# main implementation for debugging
if __name__ == '__main__':
daq = Dispersion()
daq.setup(1,10,100)
# threaded execution
scanner=SlicScan()
scanner.start(daq,True)