Support for various app and slic wrapper

This commit is contained in:
2023-06-13 14:18:09 +02:00
parent f22a17852c
commit dc5df0ca2a
13 changed files with 488 additions and 31 deletions

175
app/dispersiontools.py Normal file
View File

@ -0,0 +1,175 @@
import datetime
import re
import numpy as np
from bsread import dispatcher
import epics
from slic.core.adjustable import PVAdjustable
from slic.core.acquisition import BSAcquisition
from slic.core.scanner import Scanner
def getAux(pvs=None):
if not pvs:
return
ret={}
val = epics.caget_many(pvs)
for i,pv in enumerate(pvs):
if val[i]: # filter out None values
ret[pv]=float(val[i])
epics.ca.clear_cache()
return ret
def getBSChannels(regexp):
prog = re.compile(regexp)
res = []
for bs in dispatcher.get_current_channels():
if prog.match(bs['name']):
res.append(bs['name'])
return res
class Dispersion:
def __init__(self, branch = 'Aramis'):
self.scanname = 'Dispersion'
self.branch = 'None'
dirpath= datetime.datetime.now().strftime('/sf/data/measurements/%Y/%m/%d/slic_sfbd')
self.scandir='%s/%s' % (dirpath,self.scanname)
self.setBranch()
self.sc = None
self.Nsteps = 2
self.Nsamples = 1
def setBranch(self,branch = 'Aramis'):
if branch == 'Athos Dump':
self.setupAthosDump()
elif branch == 'Aramis':
self.setupAramis()
else:
self.branch = 'None'
def setupAramis(self):
# pre-scan item
self.pre = {}
self.pre['SFB_BEAM_DUMP_AR:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_BEAM_ENERGY_ECOL_AR:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_S30:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_SAR:ONOFF1']={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'S30:SET-E-GAIN-OP'
self.adjRB = 'S30:GET-E-GAIN-OP'
self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1)
self.amp = 20 # the amplitude of the scan, which can be scaled
# acquisition
sensor1 = getBSChannels('SAR.*DBPM.*:[XY]1$')
sensor2 = getBSChannels('S[23].*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
self.acq = [BSAcquisition("machine","sfbd", default_channels=self.sensor)]
# auxiliar data to be read one
self.aux=[]
for sen in sensor2:
if 'PHASE-VS' in sen:
self.aux.append(sen.replace('PHASE-VS','GET-VSUM-PHASE-OFFSET').replace('RLLE-DSP','RSYS'))
self.aux.append(sen.replace('PHASE-VS','GET-VSUM-AMPLT-SCALE').replace('RLLE-DSP','RSYS'))
self.aux.append(sen.replace('PHASE-VS','SM-SET').replace('RLLE-DSP','RMSM'))
self.aux.append('S10BC02-MBND100:ENERGY-OP')
# scanner
self.branch='Aramis'
self.path = '%s-%s' % (self.scanname,self.branch)
self.scanner = Scanner(data_base_dir=self.path,scan_info_dir=self.path,
make_scan_sub_dir=True,
default_acquisitions=self.acq)
def setupAthosDump(self):
# pre-scan item
self.pre = {}
self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_SAT:ONOFF1']={'Val':0,'InitVal':0}
for i in range(1,5):
self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'SATCB01-RSYS:SET-BEAM-PHASE'
self.adjRB = 'SATCB01-RSYS:GET-BEAM-PHASE'
self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1)
self.amp = 20 # the amplitude of the scan, which can be scaled
# acquisition
self.sensor = ['SATBD02-DBPM010:Y2','SATBD02-DBPM040:Y2']
self.acq = [BSAcquisition("machine","sfbd", default_channels=self.sensor)]
# auxiliar data to be read one
aux = ['SATCL01-MBND100:ENERGY-OP']
# scanner
self.branch='Athos_Dump'
self.path = '%s-%s' % (self.scanname,self.branch)
self.scanner = Scanner(data_base_dir=self.path,scan_info_dir=self.path,
make_scan_sub_dir=True,
default_acquisitions=self.acq)
def setup(self,scl = 1, Nsteps=5, Nsamples=5):
val = self.adj.get_current_value(readback=False)
dval = self.amp*scl
self.N = Nsteps
self.Ns= Nsamples
self.values=np.linspace(val-dval,val+dval,num=self.N)
def preaction(self):
for key in self.pre.keys():
self.pre[key]['InitVal'] = self.pre[key]['adj'].get_current_value(readback = False)
self.pre[key]['adj'].set_target_value(self.pre[key]['Val'])
def postaction(self):
for key in self.pre.keys():
self.pre[key]['adj'].set_target_value(self.pre[key]['InitVal'])
def scan(self):
self.sc=self.scanner.ascan_list(self.adj,self.values,
filename=self.scanname,start_immediately = False,
n_pulses=self.Ns,return_to_initial_values=True)
self.preaction()
self.sc.run()
self.auxdata = getAux(self.aux)
self.postaction()
def stop(self):
if self.sc is None:
return
self.sc.stop()
def running(self):
return self.sc.running
def status(self):
si = self.sc.scan_info.to_dict()
steps = 0
if 'scan_values' in si:
steps=len(si['scan_values'])
return steps,self.N
def info(self):
return self.sc.scan_info.to_dict()