Debug Dispersion measurement and added first support for XTCAV stabilizer

This commit is contained in:
2023-06-16 16:58:56 +02:00
parent dc5df0ca2a
commit a54abd383e
8 changed files with 132 additions and 56 deletions

View File

@ -40,13 +40,14 @@ class AdaptiveOrbit:
def initBSStream(self,channels):
print("Initializing BSstream")
bs = BSCache(100000,10000) # 1 second time out, capazity for 100 second.
bs.stop()
for cnl in channels[1:]:
if not is_available(cnl):
raise ValueError(f"BS-Channel {cbl} is not available")
res = make_channel_config(cnl,None,None)
bs.channels[res]=res
bs.get_var(channels[0]) # this starts also the stream into the cache
# bs.stop()
# for cnl in channels[1:]:
# if not is_available(cnl):
# raise ValueError(f"BS-Channel {cbl} is not available")
# res = make_channel_config(cnl,None,None)
# bs.channels[res]=res
# bs.get_var(channels[0]) # this starts also the stream into the cache
bs.get_vars(channels)
return bs
def initPV(self,chx):

View File

@ -6,9 +6,9 @@ from bsread import dispatcher
import epics
from slic.core.adjustable import PVAdjustable
from slic.core.acquisition import BSAcquisition
from slic.core.acquisition import BSAcquisition, PVAcquisition
from slic.core.scanner import Scanner
from sfbd.ext import CounterAdjustable
def getAux(pvs=None):
if not pvs:
@ -35,25 +35,74 @@ class Dispersion:
def __init__(self, branch = 'Aramis'):
self.scanname = 'Dispersion'
self.branch = 'None'
dirpath= datetime.datetime.now().strftime('/sf/data/measurements/%Y/%m/%d/slic_sfbd')
self.scandir='%s/%s' % (dirpath,self.scanname)
dirpath= datetime.datetime.now().strftime('measurements/%Y/%m/%d/slic_sfbd')
self.basedir = '/sf/data/%s/%s' % (dirpath,self.scanname)
self.setBranch()
self.pgroup='%s/%s' % (dirpath,self.scanname)
self.scandir='/sf/data/'+self.pgroup
self.setBranch(branch)
self.sc = None
self.Nsteps = 2
self.Nsamples = 1
def setBranch(self,branch = 'Aramis'):
self.sc = None
if branch == 'Athos Dump':
self.setupAthosDump()
elif branch == 'Aramis':
self.setupAramis()
elif branch == 'Athos':
self.setupAthos()
else:
self.branch = 'None'
self.scanner = None
return
# path is define in the various set-up procedures
self.scanner = Scanner(data_base_dir=self.basedir,scan_info_dir=self.basedir,
make_scan_sub_dir=True,
default_acquisitions=self.acq)
def getRFCalibrationChannels(self,sensor2,energy):
aux=[]
for sen in sensor2:
if 'PHASE-VS' in sen:
aux.append(sen.replace('PHASE-VS','GET-VSUM-PHASE-OFFSET').replace('RLLE-DSP','RSYS'))
aux.append(sen.replace('PHASE-VS','GET-VSUM-AMPLT-SCALE').replace('RLLE-DSP','RSYS'))
aux.append(sen.replace('PHASE-VS','SM-GET').replace('RLLE-DSP','RMSM'))
aux.append(energy)
return aux
def setupAthosDump(self):
self.branch='Athos_Dump'
pgroup = '%s-%s' % (self.pgroup,self.branch)
self.path = '/sf/data/'+pgroup
# pre-scan item - needs an adjustment of the dipole current of about 2.3e-3 - to be implemented later.
self.pre = {}
self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_SAT:ONOFF1']={'Val':0,'InitVal':0}
for i in range(1,5):
self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'SATCB01-RSYS:SET-BEAM-PHASE'
self.adjRB = 'SATCB01-RSYS:GET-BEAM-PHASE'
self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1)
self.amp = 30 # the amplitude of the scan, which can be scaled
# acquisition
sensor1 = getBSChannels('SATBD02-DBPM.*:Y2$')
sensor2 = getBSChannels('SATCB.*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
self.acq = [BSAcquisition(".",pgroup, default_channels=self.sensor)]
# auxiliar data to be read one
self.aux = self.getRFCalibrationChannels(sensor2,'SATCL01-MBND100:ENERGY-OP')
def setupAramis(self):
self.branch='Aramis'
pgroup = '%s-%s' % (self.pgroup,self.branch)
self.path = '/sf/data/'+pgroup
# pre-scan item
self.pre = {}
self.pre['SFB_BEAM_DUMP_AR:ONOFF1']={'Val':0,'InitVal':0}
@ -71,55 +120,46 @@ class Dispersion:
sensor1 = getBSChannels('SAR.*DBPM.*:[XY]1$')
sensor2 = getBSChannels('S[23].*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
self.acq = [BSAcquisition("machine","sfbd", default_channels=self.sensor)]
self.acq = [BSAcquisition(".",pgroup, default_channels=self.sensor)]
# auxiliar data to be read one
self.aux=[]
for sen in sensor2:
if 'PHASE-VS' in sen:
self.aux.append(sen.replace('PHASE-VS','GET-VSUM-PHASE-OFFSET').replace('RLLE-DSP','RSYS'))
self.aux.append(sen.replace('PHASE-VS','GET-VSUM-AMPLT-SCALE').replace('RLLE-DSP','RSYS'))
self.aux.append(sen.replace('PHASE-VS','SM-SET').replace('RLLE-DSP','RMSM'))
self.aux.append('S10BC02-MBND100:ENERGY-OP')
self.aux = self.getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP')
# scanner
def setupAthos(self):
self.branch='Aramis'
self.path = '%s-%s' % (self.scanname,self.branch)
self.scanner = Scanner(data_base_dir=self.path,scan_info_dir=self.path,
make_scan_sub_dir=True,
default_acquisitions=self.acq)
def setupAthosDump(self):
pgroup = '%s-%s' % (self.pgroup,self.branch)
self.path = '/sf/data/'+pgroup
# pre-scan item
self.pre = {}
self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_BEAM_ENERGY_ECOL_AT:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_SWY:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_SAT:ONOFF1']={'Val':0,'InitVal':0}
for i in range(1,5):
self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'SATCB01-RSYS:SET-BEAM-PHASE'
self.adjRB = 'SATCB01-RSYS:GET-BEAM-PHASE'
self.adjSV = 'S20:SET-E-GAIN-OP'
self.adjRB = 'S20:GET-E-GAIN-OP'
self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1)
self.amp = 20 # the amplitude of the scan, which can be scaled
# self.adj2SV = 'S30:SET-E-GAIN-OP'
# self.adj2RB = 'S30:GET-E-GAIN-OP'
# self.adj2 = PVAdjustable(self.adj2SV,pvname_readback = self.adj2RB, accuracy = 0.1)
# self.adj = CounterAdjustable(self.adj1,self.adj2) # combine the two channels
self.amp = 10 # the amplitude of the scan, which can be scaled
# acquisition
self.sensor = ['SATBD02-DBPM010:Y2','SATBD02-DBPM040:Y2']
self.acq = [BSAcquisition("machine","sfbd", default_channels=self.sensor)]
sensor1 = getBSChannels('SAT[SDC].*DBPM.*:[XY]2$')
sensor2 = getBSChannels('S[2].*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
self.acq = [BSAcquisition(".",pgroup, default_channels=self.sensor)]
# auxiliar data to be read one
aux = ['SATCL01-MBND100:ENERGY-OP']
# scanner
self.branch='Athos_Dump'
self.path = '%s-%s' % (self.scanname,self.branch)
self.scanner = Scanner(data_base_dir=self.path,scan_info_dir=self.path,
make_scan_sub_dir=True,
default_acquisitions=self.acq)
self.aux = self.getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP')
def setup(self,scl = 1, Nsteps=5, Nsamples=5):
val = self.adj.get_current_value(readback=False)
dval = self.amp*scl
dval = 0
self.N = Nsteps
self.Ns= Nsamples
self.values=np.linspace(val-dval,val+dval,num=self.N)
@ -137,7 +177,7 @@ class Dispersion:
def scan(self):
self.sc=self.scanner.ascan_list(self.adj,self.values,
filename=self.scanname,start_immediately = False,
filename=self.branch,start_immediately = False,
n_pulses=self.Ns,return_to_initial_values=True)
self.preaction()
self.sc.run()

View File

@ -3,6 +3,7 @@ import numpy as np
from slic.core.acquisition import PVAcquisition
from slic.core.acquisition import BSAcquisition
from slic.core.adjustable import PVAdjustable
from slic.devices.general import motor
from slic.core.scanner import Scanner
from sfbd.ext import CamAcquisition
@ -17,6 +18,7 @@ class LaserScanBase:
print('Init Base Class')
self.SV= 'SSL-LMOT-M1104:MOT'
self.pol = motor.Motor(self.SV)
# self.pol = PVAdjustable(self.SV)
def stop(self):
if self.sc is None:
@ -36,11 +38,13 @@ class LaserScanBase:
def info(self):
return self.sc.scan_info.to_dict()
def setup(self,amax=21,Nsteps=5,Nsamples=5):
def setup(self,amax=45,Nsteps=5,Nsamples=5):
amin = 0
self.N = Nsteps
self.Ns= Nsamples
self.values=np.linspace(19,21,num=self.N) # needs a change
amin = 15
amax = 22
self.values=np.linspace(amin,amax,num=self.N) # needs a change
# measuring the pulse energy as a function of the controling PV. Note that the power should be limited to 300 uJ
# thus limiting the value of the actuaor defining the lase rpulse energy in the EnergyModulaiton class.
@ -54,7 +58,7 @@ class LaserPower(LaserScanBase):
self.scandir='%s/%s' % (dirpath,self.scanname)
self.RB = 'SSL-LENG-SLNK1:VAL_GET'
self.erg = PollingPVAcquisition("machine","sfbd", default_channels=[self.RB])
self.erg = PVAcquisition("machine","sfbd", default_channels=[self.RB])
self.scanner = Scanner(data_base_dir=self.scandir,scan_info_dir=self.scandir,make_scan_sub_dir=True,
default_acquisitions=[self.erg])

View File

@ -2,7 +2,6 @@ import time
import numpy as np
from bstrd import BSCache
from bstrd.bscache import make_channel_config, is_available
from epics import PV
class SpectralAnalysis:

29
app/xtcavstabilizer.py Normal file
View File

@ -0,0 +1,29 @@
import time
import numpy as np
from bstrd import BSCache
class XTCAVStabilizer:
"""
Wrapper class to bundle all daq/io needed for stabilizing the XTCAV
"""
def __init__(self):
self.bs = BSCache(100000,10000) # 100 second timeout, size for 100 second data taken
self.channels = ['SATBD02-DBPM040:X2','SATMA02-RLLE-DSP:PHASE-VS']
self.bs.get_vars(self.channels) # this starts the stream into the cache
self.bs.stop()
def terminate(self):
print('Stopping BSStream Thread...')
self.bs.stop()
self.bs.pt.running.clear() # for some reason I have to
def flush(self):
self.bs.flush()
def read(self):
data=self.bs.__next__()
return data['pid'],data[self.channels[0]],data[self.channels[1]] # returns PID, BPM reading, TCAV Phase

View File

@ -39,12 +39,12 @@ def loadActuator(hid,scanrun=1):
data = {}
if 'actuators' in hid[run]['method'].keys():
for key1 in hid[run]['method']['actuators'].keys():
if isinstance(hid[run]['method']['actuators'],h5py.Group):
if isinstance(hid[run]['method']['actuators'][key1],h5py.Group):
for key2 in hid[run]['method']['actuators'][key1].keys():
val = hid[run]['method']['actuators'][key1][key2][()]
data[key1+':'+key2]={'val':val}
else:
data[key1]=hid[run]['method']['actuators'][key1][()]
else:
data[key1]=hid[run]['method']['actuators'][key1][()]
return data

View File

@ -154,6 +154,8 @@ def writeAnalysis(hid,data,scanrun=1):
def writeFigure(filename,figs):
for i,ele in enumerate(figs):
if ele == None:
continue
plotname='%s_Fig%d.png' % (filename,(i+1))
im = Image.open(ele)
im.save(plotname)

View File

@ -28,11 +28,12 @@ class SlicScan(QObject):
def start(self,daq,snap=False):
self.clear()
self.daq=daq
Thread(target=self.Tmonitor).start()
self.startSnap(snap)
def startSnap(self,snap=False):
if not snap:
if snap:
Thread(target=self.Tsnap).start()
def Tsnap(self):