Files
sfbd/app/dispersiontools.py

217 lines
8.1 KiB
Python

import datetime
import re
import numpy as np
from bsread import dispatcher
import epics
from slic.core.adjustable import PVAdjustable
from slic.core.acquisition import BSAcquisition, PVAcquisition
from slic.core.scanner import Scanner
from sfbd.ext import CounterAdjustable
from sfbd.ext import BSCAcquisition
def getAux(pvs=None):
if not pvs:
return
ret={}
val = epics.caget_many(pvs)
for i,pv in enumerate(pvs):
if val[i]: # filter out None values
ret[pv]=float(val[i])
epics.ca.clear_cache()
return ret
def getBSChannels(regexp):
prog = re.compile(regexp)
res = []
for bs in dispatcher.get_current_channels():
if prog.match(bs['name']):
res.append(bs['name'])
return res
class Dispersion:
def __init__(self, branch = 'Aramis'):
self.scanname = 'Dispersion'
self.branch = 'None'
dirpath= datetime.datetime.now().strftime('measurements/%Y/%m/%d/slic_sfbd')
self.basedir = '/sf/data/%s/%s' % (dirpath,self.scanname)
self.pgroup='%s/%s' % (dirpath,self.scanname)
self.scandir='/sf/data/'+self.pgroup
self.setBranch(branch)
self.sc = None
self.Nsteps = 2
self.Nsamples = 1
def setBranch(self,branch = 'Aramis'):
self.sc = None
if branch == 'Athos Dump':
self.setupAthosDump()
elif branch == 'Aramis':
self.setupAramis()
elif branch == 'Athos':
self.setupAthos()
else:
self.scanner = None
return
# path is define in the various set-up procedures
self.scanner = Scanner(data_base_dir=self.basedir,scan_info_dir=self.basedir,
make_scan_sub_dir=True,
default_acquisitions=self.acq)
def getRFCalibrationChannels(self,sensor2,energy):
aux=[]
for sen in sensor2:
if 'PHASE-VS' in sen:
aux.append(sen.replace('PHASE-VS','GET-VSUM-PHASE-OFFSET').replace('RLLE-DSP','RSYS'))
aux.append(sen.replace('PHASE-VS','GET-VSUM-AMPLT-SCALE').replace('RLLE-DSP','RSYS'))
aux.append(sen.replace('PHASE-VS','SM-GET').replace('RLLE-DSP','RMSM'))
aux.append(energy)
return aux
def setupAthosDump(self):
self.branch='Athos_Dump'
pgroup = '%s-%s' % (self.pgroup,self.branch)
self.path = '/sf/data/'+pgroup
# pre-scan item - needs an adjustment of the dipole current of about 2.3e-3 - to be implemented later.
self.pre = {}
self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_SAT:ONOFF1']={'Val':0,'InitVal':0}
for i in range(1,5):
self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'SATCB01-RSYS:SET-BEAM-PHASE'
self.adjRB = 'SATCB01-RSYS:GET-BEAM-PHASE'
self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1,ID = self.adjSV, name = "SATCB01-Linac")
self.amp = 30 # the amplitude of the scan, which can be scaled
# acquisition
sensor1 = getBSChannels('SATBD02-DBPM.*:Y2$')
sensor2 = getBSChannels('SATCB.*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
self.acq = [BSAcquisition(".",pgroup, default_channels=self.sensor)]
# auxiliar data to be read one
self.aux = self.getRFCalibrationChannels(sensor2,'SATCL01-MBND100:ENERGY-OP')
def setupAramis(self):
self.branch='Aramis'
pgroup = '%s-%s' % (self.pgroup,self.branch)
self.path = '/sf/data/'+pgroup
# pre-scan item
self.pre = {}
self.pre['SFB_BEAM_DUMP_AR:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_BEAM_ENERGY_ECOL_AR:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_S30:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_SAR:ONOFF1']={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'S30:SET-E-GAIN-OP'
self.adjRB = 'S30:GET-E-GAIN-OP'
self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1,ID = self.adjSV, name = "Linac3")
self.amp = 20 # the amplitude of the scan, which can be scaled
# acquisition
sensor1 = getBSChannels('SAR.*DBPM.*:[XY]1$')
sensor2 = getBSChannels('S[23].*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
self.acq = [BSAcquisition(".",pgroup, default_channels=self.sensor)]
# auxiliar data to be read one
self.aux = self.getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP')
def setupAthos(self):
self.branch='Aramis'
pgroup = '%s-%s' % (self.pgroup,self.branch)
self.path = '/sf/data/'+pgroup
# pre-scan item
self.pre = {}
self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_BEAM_ENERGY_ECOL_AT:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_SWY:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_SAT:ONOFF1']={'Val':0,'InitVal':0}
for i in range(1,5):
self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'S20:SET-E-GAIN-OP'
self.adjRB = 'S20:GET-E-GAIN-OP'
self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1,ID = self.adjSV, name = "Linac 2 and 3")
# self.adj2SV = 'S30:SET-E-GAIN-OP'
# self.adj2RB = 'S30:GET-E-GAIN-OP'
# self.adj2 = PVAdjustable(self.adj2SV,pvname_readback = self.adj2RB, accuracy = 0.1)
# self.adj = CounterAdjustable(self.adj1,self.adj2) # combine the two channels
self.amp = 10 # the amplitude of the scan, which can be scaled
# acquisition
sensor1 = getBSChannels('SAT[SDC].*DBPM.*:[XY]2$')
sensor2 = getBSChannels('S[2].*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
self.acq = [BSAcquisition(".",pgroup, default_channels=self.sensor)]
# auxiliar data to be read one
self.aux = self.getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP')
def setup(self,scl = 1, Nsteps=5, Nsamples=5):
val = self.adj.get_current_value(readback=False)
# dval = self.amp*scl
dval = 0 ######## edit this
self.N = Nsteps
self.Ns= Nsamples
self.values=np.linspace(val-dval,val+dval,num=self.N)
def preaction(self):
for key in self.pre.keys():
self.pre[key]['InitVal'] = self.pre[key]['adj'].get_current_value(readback = False)
self.pre[key]['adj'].set_target_value(self.pre[key]['Val'])
def postaction(self):
for key in self.pre.keys():
self.pre[key]['adj'].set_target_value(self.pre[key]['InitVal'])
def scan(self):
self.sc=self.scanner.ascan_list(self.adj,self.values,
filename=self.branch,start_immediately = False,
n_pulses=self.Ns,return_to_initial_values=True)
# self.preaction() ######
self.sc.run()
# self.auxdata = getAux(self.aux)
# self.postaction() #######
def stop(self):
if self.sc is None:
return
self.sc.stop()
def running(self):
return self.sc.running
def status(self):
si = self.sc.scan_info.to_dict()
steps = 0
if 'scan_values' in si:
steps=len(si['scan_values'])
return steps,self.N
def info(self):
return self.sc.scan_info.to_dict()