Update Master with a functional version of sfbd #1

Merged
reiche merged 13 commits from dev into master 2024-01-10 09:12:37 +01:00
23 changed files with 1454 additions and 0 deletions

0
__init__.py Normal file
View File

5
app/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
from .adaptiveorbit import AdaptiveOrbit
from .spectralanalysis import SpectralAnalysis
from .hero import LaserPower,EnergyModulation
from .dispersiontools import Dispersion
from .xtcavstabilizer import XTCAVStabilizer

131
app/adaptiveorbit.py Normal file
View File

@@ -0,0 +1,131 @@
import time
import numpy as np
from bstrd import BSCache
from bstrd.bscache import make_channel_config, is_available
from epics import PV
class AdaptiveOrbit:
"""
Wrapper class to bundle all daq/io needed for adaptive orbit feedback.
"""
def __init__(self):
# Aramis Channels
self.ARch0 = 'SARFE10-PBIG050-EVR0:CALCI'
self.ARchx = ['SARUN%2.2d-DBPM070:X1' % id for id in range(1,17)]
self.ARchy = ['SARUN%2.2d-DBPM070:Y1' % id for id in range(1,17)]
self.bsAR = self.initBSStream([self.ARch0]+self.ARchx+self.ARchy)
self.pvAR = self.initPV(self.ARchx)
self.kickerAR = self.initPV(['SARMA02-MCRX050:I-SET','SARMA02-MCRY050:I-SET','SARUN02-MCRX080:I-SET','SARUN02-MCRY080:I-SET','SFB_ORBIT_SAR:ONOFF1'])
# Athos Channels
self.ATch0 = 'SATFE10-PEPG046-EVR0:CALCI'
self.ATchx=[]
self.ATchy=[]
for bpm in range(5,23):
idx = '070'
if bpm == 5 or bpm ==14:
idx='410'
self.ATchx.append('SATUN%2.2d-DBPM%s:X2' % (bpm,idx))
self.ATchy.append('SATUN%2.2d-DBPM%s:Y2' % (bpm,idx))
self.bsAT = self.initBSStream([self.ATch0]+self.ATchx+self.ATchy)
self.pvAT = self.initPV(self.ATchx)
self.kickerAT = self.initPV(['SATMA01-MCRX610:I-SET','SATMA01-MCRY610:I-SET','SATUN05-MCRX420:I-SET','SATUN05-MCRY420:I-SET','SFB_ORBIT_SAT:ONOFF1'])
# select first beamline
self.isAramis = True
def initBSStream(self,channels):
print("Initializing BSstream")
bs = BSCache(100000,100000) # 1000 second time out, capazity for 1000 second.
bs.get_vars(channels)
return bs
def initPV(self,chx):
print("Initializing EPICS Channels")
pvs = []
for x in chx:
if ':X1' in x:
pvs.append(PV(x.replace(':X1',':X-REF-FB')))
pvs.append(PV(x.replace(':X1',':Y-REF-FB')))
elif ':X2' in x:
pvs.append(PV(x.replace(':X2',':X-REF-FB')))
pvs.append(PV(x.replace(':X2',':Y-REF-FB')))
else:
pvs.append(PV(x))
con = [pv.wait_for_connection(timeout=0.2) for pv in pvs]
for i, val in enumerate(con):
if val is False:
name = pv[i].pvname
raise ValueError(f"PV-Channel {name} is not available")
return pvs
def flush(self):
self.bsAR.flush()
self.bsAT.flush()
def terminate(self):
print('Stopping BSStream Thread...')
self.bsAR.stop()
self.bsAR.pt.running.clear() # for some reason I have to
self.bsAT.stop()
self.bsAT.pt.running.clear() # for some reason I have to
def getBeamline(self,beamline):
if beamline == 'Aramis':
self.isAramis = True
else:
self.isAramis = False
# all routine for accessing the machine (read/write)
def read(self):
if self.isAramis:
data=self.bsAR.__next__()
return data['pid'],data[self.ARch0],np.array([data[cnl] for cnl in self.ARchx]),np.array([data[cnl] for cnl in self.ARchy])
data=self.bsAT.__next__()
return data['pid'],data[self.ATch0],np.array([data[cnl] for cnl in self.ATchx]),np.array([data[cnl] for cnl in self.ATchy])
def readPV(self):
if self.isAramis:
return [pv.value for pv in self.pvAR]
return [pv.value for pv in self.pvAT]
def setPV(self,fbval):
if self.isAramis:
for i in range(len(fbval)):
self.pvAR[i].value = fbval[i]
else:
for i in range(len(fbval)):
self.pvAT[i].value = fbval[i]
def getPVNames(self):
if self.isAramis:
return [pv.pvname for pv in self.pvAR]
return [pv.pvname for pv in self.pvAT]
def getDetectorName(self):
if self.isAramis:
return self.ARch0
return self.ATch0
def getKicker(self):
if self.isAramis:
return [pv.value for pv in self.kickerAR]
return [pv.value for pv in self.kickerAT]
def setKicker(self,vals):
if self.isAramis:
for i,val in enumerate(vals):
self.kickerAR[i].value = val
return
for i,val in enumerate(vals):
self.kickerAT[i].value = val

316
app/dispersiontools.py Normal file
View File

@@ -0,0 +1,316 @@
import datetime
import re
import numpy as np
from bsread import dispatcher
import epics
from slic.core.adjustable import PVAdjustable
from slic.core.acquisition import BSAcquisition, PVAcquisition
from slic.core.scanner import Scanner
from sfbd.ext import CounterAdjustable
from sfbd.ext import BSCAcquisition
from sfbd.interface import SlicScan
from bstrd import BSCache
def getAux(pvs=None):
if not pvs:
return
ret={}
val = epics.caget_many(pvs)
for i,pv in enumerate(pvs):
if val[i]: # filter out None values
ret[pv]=float(val[i])
return ret
def getBSChannels(regexp):
prog = re.compile(regexp)
res = []
for bs in dispatcher.get_current_channels():
if prog.match(bs['name']):
res.append(bs['name'])
return res
def getRFCalibrationChannels(sensors,energy):
aux=[]
for sen in sensors:
if 'PHASE-VS' in sen:
aux.append(sen.replace('PHASE-VS','GET-VSUM-PHASE-OFFSET').replace('RLLE-DSP','RSYS'))
aux.append(sen.replace('PHASE-VS','GET-VSUM-AMPLT-SCALE').replace('RLLE-DSP','RSYS'))
aux.append(sen.replace('PHASE-VS','SM-GET').replace('RLLE-DSP','RMSM'))
aux.append(energy)
return aux
class Dispersion:
def __init__(self, branch = 'Aramis'):
self.scanname = 'Dispersion'
self.branch = None
dirpath= datetime.datetime.now().strftime('measurements/%Y/%m/%d/slic_sfbd')
self.basedir = '/sf/data/%s/%s' % (dirpath,self.scanname)
self.pgroup='%s/%s' % (dirpath,self.scanname)
self.scandir='/sf/data/'+self.pgroup
self.setBranch(branch) # enfore Athos dump settings for now
self.Nsteps = 2
self.Nsamples = 1
self.bsc = None
def setBranch(self,branch = 'Aramis'):
self.sc = None
if branch == 'Athos Dump':
self.setupAthosDump()
elif branch == 'Aramis':
self.setupAramis()
elif branch == 'Athos':
self.setupAthos()
elif branch =='Bunch Compressor 2':
self.setupBC2()
else:
self.branch = None
return self.branch
def setup(self,scl = 1, Nsteps=5, Nsamples=10):
# the setup is done from the main thread. Therefore I have to define the BSC here
self.N = Nsteps
self.Ns= Nsamples
self.scale = scl
# define stream
print('Getting BSCache')
self.bsc = BSCache(100000,10000) # 100 second timeout, size for 100 second data taken
self.bsc.get_vars(self.sensor) # this starts the stream into the cache
print('Getting BSCache done')
def scan(self):
# core routine to do all action. Will be a thread of the slic scanner wrapper
if not self.branch:
return
# define acquisition channels wrapper for BSQueue
pgroup = '%s-%s' % (self.pgroup,self.branch)
acq = [BSCAcquisition(".",pgroup, default_channels=[self.bsc])]
# define the scanner
self.scanner = Scanner(data_base_dir=self.basedir, # root directory of data location e.g. /sf/data/2023/02/02/Dispersion
scan_info_dir=self.basedir, # write also scan info there
make_scan_sub_dir=True, # put each scan in its own directory
default_acquisitions=acq) # adjustable to use
# define adjustable
self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1,ID = self.adjSV, name = self.name)
val = self.adj.get_current_value(readback=False)
if self.adj2SV:
self.adj2 = PVAdjustable(self.adj2SV,pvname_readback = self.adj2RB, accuracy = 0.1,ID = self.adjSV, name = self.name)
val2 = self.adj2.get_current_value(readback=False)
dval = self.amp*self.scale
print(self.adjSV,' - Scan Range:',val-dval,'to',val+dval)
if self.adj2SV:
print(self.adj2SV,' - Scan Range:',val2+dval,'to',val2-dval)
# create scanner backend
if self.adj2SV:
self.sc=self.scanner.a2scan(self.adj, val-dval, val+dval,
self.adj2, val2+dval, val2-dval,
n_intervals = self.N-1, # steps
n_pulses = self.Ns, # samples
filename=self.branch, # relative directory for data
start_immediately = False, # wait for execution to performe pre-action items
return_to_initial_values=True) # return to initial values
else:
self.sc=self.scanner.ascan(self.adj, val-dval, val+dval,
n_intervals = self.N-1, # steps
n_pulses = self.Ns, # samples
filename=self.branch, # relative directory for data
start_immediately = False, # wait for execution to performe pre-action items
return_to_initial_values=True) # return to initial values
# get aux data first
self.auxdata = getAux(self.aux)
self.preaction()
self.sc.run()
self.postaction()
self.bsc.stop() # stop the queue
#################################
# definition of the individual branches
def setupBC2(self):
# branch and name tag
self.branch='Bunch Compressor 2'
self.name ='BC2'
# pre-scan item - needs an adjustment of the dipole current of about 2.3e-3 - to be implemented later.
self.pre = {}
self.pre['SFB_COMPRESSION_BC2_AR:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_COMPRESSION_BC2_AR:ONOFF2']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_S10:ONOFF1']={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'S10:SET-E-GAIN-OP'
self.adjRB = 'S10:GET-E-GAIN-OP'
self.adj2SV = 'S20:SET-E-GAIN-OP' # counter adjustable
self.adj2RB = 'S20:GET-E-GAIN-OP'
self.amp = 5 # the amplitude of the scan, which can be scaled
# acquisition
sensor1 = getBSChannels('S10[BM].*-DBPM.*:[XY]1$')
sensor2 = getBSChannels('S1.*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
# auxiliar data to be read one
self.aux = getRFCalibrationChannels(sensor2,'SINBC02-MBND100:ENERGY-OP')
def setupAthosDump(self):
# branch and name tag
self.branch='Athos_Dump'
self.name ='SATCB01-Linac'
# pre-scan item - needs an adjustment of the dipole current of about 2.3e-3 - to be implemented later.
self.pre = {}
self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_SAT:ONOFF1']={'Val':0,'InitVal':0}
for i in range(1,5):
self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'SATCB01-RSYS:SET-BEAM-PHASE'
self.adjRB = 'SATCB01-RSYS:GET-BEAM-PHASE'
self.adj2SV = None
self.adj2RB = None
self.amp = 30 # the amplitude of the scan, which can be scaled
# acquisition
sensor1 = getBSChannels('SATBD02-DBPM.*:Y2$')
sensor2 = getBSChannels('SATCB.*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
# auxiliar data to be read one
self.aux = getRFCalibrationChannels(sensor2,'SATCL01-MBND100:ENERGY-OP')
def setupAramis(self):
# branch and name tag
self.branch='Aramis'
self.name = 'Linac3'
# pre-scan item
self.pre = {}
self.pre['SFB_BEAM_DUMP_AR:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_BEAM_ENERGY_ECOL_AR:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_S30:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_SAR:ONOFF1']={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'S30:SET-E-GAIN-OP'
self.adjRB = 'S30:GET-E-GAIN-OP'
self.adj2SV = None
self.adj2RB = None
self.amp = 20 # the amplitude of the scan, which can be scaled
# acquisition
sensor1 = getBSChannels('SAR[CMU].*DBPM.*:[XY]1$')
sensor2 = getBSChannels('S[23].*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
# auxiliar data to be read one
self.aux = getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP')
def setupAthos(self):
# branch and name tag
self.branch='Athos'
self.name = 'Linac2+3'
# pre-scan item
self.pre = {}
self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_BEAM_ENERGY_ECOL_AT:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_SWY:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_SAT:ONOFF1']={'Val':0,'InitVal':0}
for i in range(1,5):
self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'S20:SET-E-GAIN-OP'
self.adjRB = 'S20:GET-E-GAIN-OP'
self.adj2SV = 'S30:SET-E-GAIN-OP' # counter adjustable
self.adj2RB = 'S30:GET-E-GAIN-OP'
self.amp = 5 # the amplitude of the scan, which can be scaled
# acquisition
sensor1 = getBSChannels('SAT[SDC].*DBPM.*:[XY]2$')
sensor2 = getBSChannels('S[2].*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
# auxiliar data to be read one
self.aux = getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP')
#########################
# some basic interaction with the scan functionality
def preaction(self):
for key in self.pre.keys():
self.pre[key]['InitVal'] = self.pre[key]['adj'].get_current_value(readback = False)
self.pre[key]['adj'].set_target_value(self.pre[key]['Val'])
def postaction(self):
for key in self.pre.keys():
self.pre[key]['adj'].set_target_value(self.pre[key]['InitVal'])
def stop(self):
if self.sc is None:
return
self.sc.stop()
def running(self):
if self.sc is None:
return False
return self.sc.running
def status(self):
if self.sc is None:
return 0,0
si = self.sc.scan_info.to_dict()
steps = 0
if 'scan_values' in si:
steps=len(si['scan_values'])
return steps,self.N
def info(self):
if self.sc is None:
return None
return self.sc.scan_info.to_dict()
#--------------------
# main implementation for debugging
if __name__ == '__main__':
daq = Dispersion()
daq.setup(1,10,100)
# threaded execution
scanner=SlicScan()
scanner.start(daq,True)

115
app/hero.py Normal file
View File

@@ -0,0 +1,115 @@
import datetime
import numpy as np
from slic.core.acquisition import PVAcquisition
from slic.core.acquisition import BSAcquisition
from slic.core.adjustable import PVAdjustable
from slic.devices.general import motor
from slic.core.scanner import Scanner
from sfbd.ext import CamAcquisition
# some temporary wrapper
class PollingPVAcquisition(PVAcquisition):
def _acquire(self, *args, polling=True, **kwargs):
return super()._acquire(*args, polling=polling, **kwargs)
class LaserScanBase:
def __init__(self):
print('Init Base Class')
self.SV= 'SSL-LMOT-M1104:MOT'
self.pol = motor.Motor(self.SV)
# self.pol = PVAdjustable(self.SV)
def stop(self):
if self.sc is None:
return
self.sc.stop()
def running(self):
return self.sc.running
def status(self):
si = self.sc.scan_info.to_dict()
steps = 0
if 'scan_values' in si:
steps=len(si['scan_values'])
return steps,self.N
def info(self):
return self.sc.scan_info.to_dict()
def setup(self,amax=45,Nsteps=5,Nsamples=5):
amin = 0
self.N = Nsteps
self.Ns= Nsamples
amin = 15
amax = 22
self.values=np.linspace(amin,amax,num=self.N) # needs a change
# measuring the pulse energy as a function of the controling PV. Note that the power should be limited to 300 uJ
# thus limiting the value of the actuaor defining the lase rpulse energy in the EnergyModulaiton class.
class LaserPower(LaserScanBase):
def __init__(self):
super(LaserPower,self).__init__()
self.scanname = 'HEROLaserEnergy'
dirpath= datetime.datetime.now().strftime('/sf/data/measurements/%Y/%m/%d/slic_sfbd')
self.scandir='%s/%s' % (dirpath,self.scanname)
self.RB = 'SSL-LENG-SLNK1:VAL_GET'
self.erg = PVAcquisition("machine","sfbd", default_channels=[self.RB])
self.scanner = Scanner(data_base_dir=self.scandir,scan_info_dir=self.scandir,make_scan_sub_dir=True,
default_acquisitions=[self.erg])
def scan(self):
self.sc=self.scanner.ascan_list(self.pol,self.values,
filename=self.scanname,start_immediately = False,
n_pulses=self.Ns,return_to_initial_values=True)
self.sc.run()
# measuring the coherent emission/space charge blow-up as a function of the hero energy modulation
class EnergyModulation(LaserScanBase):
def __init__(self, acq = 0):
super(EnergyModulation,self).__init__()
self.scanname = 'HEROEnergyModulation'
dirpath= datetime.datetime.now().strftime('/sf/data/measurements/%Y/%m/%d/slic_sfbd')
self.scandir='%s/%s' % (dirpath,self.scanname)
self.acq = acq
if self.acq == 0:
self.RB ='SATFE10-PEPG046-EVR0:CALCI'
self.erg = BSAcquisition("machine","sfbd", default_channels=[self.RB])
elif self.acq == 1:
self.RB ='SATBD02-DBPM040:Y2'
self.erg = BSAcquisition("machine","sfbd", default_channels=[self.RB])
elif self.acq == 2:
self.RB = 'SATBD01-DSCR210'
self.erg = CamAcquisition("machine","sfbd", default_channels=[self.RB])
self.erg.getConnection(self.RB)
else:
self.RB = 'SATBD02-DSCR050'
self.erg = CamAcquisition("machine","sfbd", default_channels=[self.RB])
self.erg.getConnection(self.RB)
self.scanner = Scanner(data_base_dir=self.scandir,scan_info_dir=self.scandir,make_scan_sub_dir=True,
default_acquisitions=[self.erg])
def scan(self):
self.sc=self.scanner.ascan_list(self.pol,self.values,
filename=self.scanname,start_immediately = False,
n_pulses=self.Ns,return_to_initial_values=True)
self.sc.run()

5
app/runDispersionTools.sh Executable file
View File

@@ -0,0 +1,5 @@
#!/bin/bash
export PYTHONPATH=/sf/bd/packages/slic:/sf/bd/packages/bstrd:$PYTHONPATH
python dispersiontools.py

49
app/spectralanalysis.py Normal file
View File

@@ -0,0 +1,49 @@
import time
import numpy as np
from bstrd import BSCache
from epics import PV
class SpectralAnalysis:
"""
Wrapper class to bundle all daq/io needed for adaptive orbit feedback.
"""
def __init__(self):
self.bs = BSCache(100000,10000) # 100 second timeout, size for 100 second data taken
self.bs.stop()
self.channel = ''
self.channels = ['SARFE10-PSSS059:SPECTRUM_Y',
'SATOP21-PMOS127-2D:SPECTRUM_Y',
'SATOP31-PMOS132-2D:SPECTRUM_Y']
self.isConnected = False
def connect(self,ich):
if ich < 0 or ich >= len(self.channels):
return False
self.channel = self.channels[ich]
print('Connecting to BS-Channel:',self.channel)
self.bs.channels.clear()
self.bs.get_var(self.channel) # this starts the stream into the cache
self.pv = PV(self.channel.replace('_Y','_X'))
def terminate(self):
print('Stopping BSStream Thread...')
self.bs.stop()
self.bs.pt.running.clear() # for some reason I have to
def flush(self):
self.bs.flush()
def read(self):
data=self.bs.__next__()
return data['pid'],data[self.channel]
def readPV(self):
return self.pv.value
def getSpectrometerName(self):
return self.channel

51
app/xtcavstabilizer.py Normal file
View File

@@ -0,0 +1,51 @@
import time
import numpy as np
from bstrd import BSCache
from epics import PV
class XTCAVStabilizer:
"""
Wrapper class to bundle all daq/io needed for stabilizing the XTCAV
"""
def __init__(self):
# the PV
self.PVTCAV = PV('SATMA02-RMSM:SM-GET',connection_timeout=0.9, callback=self.stateChanged)
self.state=self.PVTCAV.value == 9
self.PVVolt = PV('SATMA02-RSYS:SET-ACC-VOLT')
self.PVPhase = PV('SATMA02-RSYS:SET-BEAM-PHASE')
# the BS channels
self.bs = BSCache(100000,10000) # 100 second timeout, size for 100 second data taken
self.channels = ['SATBD02-DBPM040:X2','SATMA02-RLLE-DSP:PHASE-VS','SATBD02-DBPM040:X2-VALID']
self.validation = self.channels[2]
self.bs.get_vars(self.channels) # this starts the stream into the cache
self.bs.stop()
def getPhase(self):
return self.PVPhase.value
def setPhase(self,phi):
self.PVPhase.set(phi)
def getVoltage(self):
return self.PVVolt.value
def stateChanged(self,pvname=None, value=0, **kws):
self.state = value == 9
def terminate(self):
print('Stopping BSStream Thread...')
self.bs.stop()
self.bs.pt.running.clear() # for some reason I have to
def flush(self):
self.bs.flush()
def read(self):
return self.bs.__next__()

4
ext/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .magnet import Magnet
from .camacquisition import CamAcquisition
from .counteradjustable import CounterAdjustable
from .bscacquisition import BSCAcquisition

40
ext/bscacquisition.py Normal file
View File

@@ -0,0 +1,40 @@
import h5py
import numpy as np
from slic.core.acquisition.acquisition import Acquisition
# class using the BSQueue to avoid to reestablish a stream for each step.
class BSCAcquisition(Acquisition):
def _acquire(self, filename, channels=None, data_base_dir=None, scan_info=None, n_pulses=100, **kwargs):
queue =channels[0] # abusing interface since BSAcquisition assume a list of channels
# allocating space
data={}
chns = queue.channels
for chn in chns:
data[chn]={'data':np.zeros((n_pulses))}
data['pulse_id']=np.zeros((n_pulses))
# clear the queue
queue.flush()
for i in range(n_pulses):
msg = queue.__next__() # pull data from cache
for chn in chns:
data[chn]['data'][i] = msg[chn]
data['pulse_id'][i]=msg['pid']
# write out the data file
hid = h5py.File(filename,'w')
hid.create_dataset('pulse_id', data = data['pulse_id'])
for chn in chns:
gid = hid.create_group(chn)
for key in data[chn].keys():
gid.create_dataset(key, data = data[chn][key])
hid.close()

41
ext/camacquisition.py Normal file
View File

@@ -0,0 +1,41 @@
from time import sleep
from tqdm import trange
import h5py
from cam_server_client import PipelineClient
from cam_server_client.utils import get_host_port_from_stream_address
from bsread import source, SUB
from slic.core.acquisition.acquisition import Acquisition
class CamAcquisition(Acquisition):
def getConnection(self,cam):
pipeline_client = PipelineClient()
cam_instance_name = str(cam) + "_sp1"
stream_address = pipeline_client.get_instance_stream(cam_instance_name)
self.host, self.port = get_host_port_from_stream_address(stream_address)
print(self.host,self.port)
def _acquire(self, filename, channels=None, data_base_dir=None, scan_info=None, n_pulses=100, **kwargs):
print("my routine")
print("extra kwargs:", kwargs)
args = (filename, n_pulses, channels)
args = ", ".join(repr(i) for i in args)
print("acquire({})".format(args))
print(f"dummy acquire to {filename}:")
# stream_host,stream_port = getPipeLine(channels[0])
# time.wait(1)
data= []
with source(host=self.host, port=self.port, mode=SUB) as input_stream:
input_stream.connect()
for i in range(n_pulses):
print('Camera Images', i)
message = input_stream.receive()
data.append(message.data.data)
hid = h5py.File(filename,'w')
gid = hid.create_group(channels[0])
for key in data[0].keys():
gid.create_dataset(key, data = [rec[key].value for rec in data])
hid.close()

23
ext/counteradjustable.py Normal file
View File

@@ -0,0 +1,23 @@
from slic.core.adjustable import Adjustable
class CounterAdjustable(Adjustable):
def __init__(self, adjustable1, adjustable2):
self.adj1=adjustable1
self.adj2=adjustable2
self.ref_values() # implementation needs reference values to convert absolute scan to relative scan
def ref_value(self):
self.val1 = self.adj1.get_current_value(readback = False)
self.val2 = self.adj2.get_current_value(readback = False)
def set_target_value(self, value):
t1 = self.adj1.set_target_value(self.val1 + value)
t2 = self.adj2.set_target_value(self.val2 - value)
t1.wait()
t2.wait()
def get_current_value(self):
return self.adj1.get_current_value()
def is_moving(self):
return any([self.adj1.is_moving(),self.adj2.is_moving()])

21
ext/magnet.py Normal file
View File

@@ -0,0 +1,21 @@
from slic.core.adjustable import PVAdjustable
from slic.utils import typename
class Magnet(PVAdjustable):
def __init__(self,name):
self.name=name
pvsv='%s:I-SET' % name
pvrb='%s:I-READ' % name
tol = 0.075
super().__init__(pvsv,pvname_readback=pvrb,accuracy=tol,internal=True)
@property
def status(self):
return "Cycling"
def __repr__(self):
tn = typename(self)
return f"{tn} \"{self.name}\" is {self.status}"

6
interface/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
from .snap import getSnap
from .snap import saveSnap
from .save import saveDataset
from .load import loadDataset
from .elog import writeElog
from .slic import SlicScan

25
interface/elog.py Normal file
View File

@@ -0,0 +1,25 @@
import os
import elog
def writeElog(text, Title = 'Test', Application = 'SFBD-Module', Attachment = None):
"""
Generates an entry in the electronic logbook of SwissFEL Commisisoning Data
:param text: The text to be placed in the log book
:param Title: Title of the log book entry
:param Application: Name of application which generates the log book entry
:param Attachment: List of attachments to be added to the log book (mostly plots)
:return: Message ID of log book entry
"""
# supplemental info
Author = os.getlogin()
Category = 'Measurement' # Info or Measurement
System = 'Beamdynamics' # Beamdynamics, Operation, Controls
dict_att = {'Author': Author, 'Application': Application, 'Category': Category, 'Title': Title, 'System': System}
logbook = elog.open('https://elog-gfa.psi.ch/SwissFEL+commissioning+data/', user='robot', password='robot')
return logbook.post(text, attributes=dict_att, attachments=Attachment)

58
interface/load.py Normal file
View File

@@ -0,0 +1,58 @@
import h5py
def loadDataset(filename):
hid = h5py.File(filename, "r")
snap = loadSnap(hid)
data = loadData(hid)
act = loadActuator(hid)
hid.close()
return data,act,snap
def loadSnap(hid):
snap={}
if not 'experiment' in hid.keys():
return None
for key1 in hid['experiment'].keys():
if isinstance(hid['experiment'][key1],h5py.Group):
for key2 in hid['experiment'][key1].keys():
val = hid['experiment'][key1][key2][()]
snap[key1+':'+key2]=val
else:
snap[key1]=hid['experiment'][key1][()]
return snap
def loadData(hid,scanrun=1):
run='scan_%d' % scanrun
data = {}
for key1 in hid[run]['data'].keys():
if isinstance(hid[run]['data'][key1],h5py.Group):
for key2 in hid[run]['data'][key1].keys():
val = hid[run]['data'][key1][key2][()]
data[key1+':'+key2]=val
else:
data[key1]=hid[run]['data'][key1][()]
return data
def loadActuator(hid,scanrun=1):
run='scan_%d' % scanrun
data = {}
if 'actuators' in hid[run]['method'].keys():
for key1 in hid[run]['method']['actuators'].keys():
if isinstance(hid[run]['method']['actuators'][key1],h5py.Group):
for key2 in hid[run]['method']['actuators'][key1].keys():
val = hid[run]['method']['actuators'][key1][key2][()]
data[key1+':'+key2]={'val':val}
else:
data[key1]=hid[run]['method']['actuators'][key1][()]
return data

189
interface/save.py Normal file
View File

@@ -0,0 +1,189 @@
import sys
import os
import datetime
import h5py
import socket
from PIL import Image
def getDatasetFileName(program='Unknown'):
year = datetime.datetime.now().strftime('%Y')
month = datetime.datetime.now().strftime('%m')
day = datetime.datetime.now().strftime('%d')
path = '/sf/data/measurements/%s' % year
if not os.path.exists(path):
os.makedirs(path)
path = '%s/%s' % (path,month)
if not os.path.exists(path):
os.makedirs(path)
path = '%s/%s' % (path,day)
if not os.path.exists(path):
os.makedirs(path)
datetag = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f')
filename=('%s/%s_%s' % (path, program.replace(' ','_'), datetag))
return filename
def saveDataset(program,data,actuator=None,snap=None,analysis=None,figures=None):
hid,filename = openDataset(program)
if not hid:
return None
# check if scan is multiple instances of a scan
if isinstance(data,list):
for iscan,singledata in enumerate(data):
writeData(hid,singledata,iscan)
else:
writeData(hid,data,1)
# same for actuator
if isinstance(actuator,list):
for iscan,singleactuator in enumerate(actuator):
writeActuator(hid,singleactuator,iscan)
else:
writeActuator(hid,actuator,1)
# and same for analysis
if isinstance(analysis,list):
for iscan,singleana in enumerate(analysis):
writeAnalysis(hid,singleana,iscan)
else:
writeAnalysis(hid,analysis,1)
# write aux data
writeSnap(hid,snap)
hid.close()
if figures:
writeFigure(filename,figures)
return filename
def openDataset(program):
if isinstance(program,str):
program={'Name':program,'Author':'Unknown','Version':'Unknown'}
if not isinstance(program,dict):
return None,None
if not 'Author' in program.keys():
program['Author']='Unknown'
if not 'Version' in program.keys():
program['Version']='Unknown'
filename=getDatasetFileName(program['Name'])
hid= h5py.File(filename+'.h5', "w")
# meta data header
dt = h5py.special_dtype(vlen=bytes)
dset=hid.create_dataset('general/user',(1,),dtype=dt)
dset[0]=os.getlogin()
dset=hid.create_dataset('general/application',(1,),dtype=dt)
dset[0]=program['Name']
dset=hid.create_dataset('general/author',(1,),dtype=dt)
dset[0]=program['Author']
dset=hid.create_dataset('general/version',(1,),dtype=dt)
dset[0]=program['Version']
dset=hid.create_dataset('general/created',(1,),dtype=dt)
dset[0]=str(datetime.datetime.now())
return hid,filename
def writeData(hid, data, scanrun=1):
# write the sensor raw value
for ele in data.keys():
name=ele.split(':')
if len(name)>1:
dset=hid.create_dataset('scan_%d/data/%s/%s' % (scanrun, name[0], name[1]), data=data[ele])
else:
dset=hid.create_dataset('scan_%d/data/%s' % (scanrun, name[0]), data=data[ele])
dset.attrs['system'] = getDatasetSystem(name[0])
dset.attrs['units'] = 'unknown'
# this part is obsolete - dimension should be given from the individual datasets
if not 'pid' in data.keys():
return
shape = data['pid'].shape
ndim = len(shape)
nsam = shape[-1]
nrec = 0
if ndim > 1:
nrec = shape[:-1][0]
hid.create_dataset("scan_%d/method/records" % scanrun,data=[nrec])
hid.create_dataset("scan_%d/method/samples" % scanrun,data=[nsam])
hid.create_dataset("scan_%d/method/dimension" % scanrun,data=[ndim])
hid.create_dataset("scan_%d/method/reducedData" % scanrun,data=[0]) # indicating that there is at least a 2D array for scalar data
def writeActuator(hid,act,scanrun=1):
if not act:
return
dt = h5py.special_dtype(vlen=bytes)
dset=hid.create_dataset("scan_%d/method/type" % scanrun,(1,),dtype=dt)
nact = len(act.keys())
if nact>0:
dset[0]='Scan'
else:
dset[0]='Time Recording'
for ele in act.keys():
name=ele.split(':')
if len(name)>1:
dset=hid.create_dataset("scan_%d/method/actuators/%s/%s" % (scanrun,name[0],name[1]),data=act[ele])
else:
dset=hid.create_dataset("scan_%d/method/actuators/%s" % (scanrun,name[0]),data=act[ele])
dset.attrs['system']=getDatasetSystem(name[0])
dset.attrs['units']='unknown'
def writeSnap(hid,val):
if not val:
return
for key in val.keys():
name=key.split(':')
if len(name)>1:
dset=hid.create_dataset('experiment/%s/%s' % (name[0],name[1]),data=val[key])
else:
dset=hid.create_dataset('experiment/%s/%s' % (name[0]),data=val[key])
dset.attrs['system']=getDatasetSystem(name[0])
dset.attrs['units']='unknown'
def writeAnalysis(hid,data,scanrun=1):
if not data:
return
for key in data.keys():
name=key.split(':')
if len(name)>1:
dset=hid.create_dataset('scan_%d/analysis/%s/%s' % (scanrun, name[0], name[1]), data=data[key])
else:
dset=hid.create_dataset('scan_%d/analysis/%s/%s' % (scanrun, name[0]), data=data[key])
dset.attrs['system']='analysis'
dset.attrs['units']='unknown'
def writeFigure(filename,figs):
for i,ele in enumerate(figs):
if ele == None:
continue
plotname='%s_Fig%d.png' % (filename,(i+1))
im = Image.open(ele)
im.save(plotname)
return None
def getDatasetSystem(name):
if len(name) > 11:
tag=name[8:9]
fulltag=name[8:12]
else:
tag=''
fulltag=''
sys='Unknown'
if tag =='P':
sys='Photonics'
if tag =='D':
sys='Diagnostics'
if fulltag =='DSCR':
sys='Camera'
if tag == 'R':
sys='RF'
if tag == 'M':
sys='Magnets'
if tag == 'U':
sys='Undulator'
return sys

98
interface/slic.py Normal file
View File

@@ -0,0 +1,98 @@
import h5py
import numpy as np
import time
from threading import Thread
from PyQt5.QtCore import QObject, pyqtSignal
# to do:
# 1 - check if scan thread is running
# 2 - import of BSread data
from sfbd.interface import getSnap
class SlicScan(QObject):
siginc = pyqtSignal(int, int) # signal for increment
sigterm = pyqtSignal(int) # signal for termination
sigsnap = pyqtSignal(bool)
def __init__(self):
QObject.__init__(self)
self.clear()
def clear(self):
self.daq = None
self.data = None
self.act = None
self.snap = None
self.doSnap = False
def start(self,daq,snap=False):
self.clear()
self.doSnap = snap
self.daq=daq
Thread(name='Scan-Monitor',target=self.Tmonitor).start()
def Tmonitor(self):
mythread = Thread(name='Slic-Scanner',target=self.Tscanner).start()
time.sleep(1)
ostep = -1
while(self.daq.running()):
istep,nstep=self.daq.status()
if istep>ostep:
ostep=istep
self.siginc.emit(istep,nstep)
time.sleep(1)
if not mythread == None: # wait till scanning thread is done
mythread.join()
istep,nstep=self.daq.status()
self.siginc.emit(istep,nstep)
self.data,self.act = importSlicScan(self.daq.info())
if hasattr(self.daq,'auxdata'):
self.data.update(self.daq.auxdata)
self.sigterm.emit(istep==nstep)
# if requested add snapshot acquisition
if self.doSnap:
self.startSnap()
def Tscanner(self):
self.daq.scan()
def startSnap(self):
Thread(name='Snap-Acquisition',target=self.Tsnap).start()
def Tsnap(self):
self.snap = getSnap()
print('Acquired snap')
self.sigsnap.emit(True)
def stop(self):
self.daq.stop()
def importSlicScan(scan_info):
if not isinstance(scan_info,dict):
return None,None
if not 'scan_files' in scan_info.keys():
return None,None
sfiles = scan_info['scan_files']
data = {}
for istep, sfile in enumerate(sfiles):
hid = h5py.File(sfile[0],'r')
for name, h5obj in hid.items():
if isinstance(h5obj,h5py.Dataset): # pv channels
data[name] = addDatasetToData(data,name,h5obj)
elif isinstance(h5obj,h5py.Group): # bs read channels
if 'data' in h5obj:
data[name] = addDatasetToData(data,name,h5obj['data'])
actuator = {}
name = scan_info['scan_parameters']['name'][0]
actuator[name]=np.array(scan_info['scan_values'])
data[name]=np.array(scan_info['scan_readbacks'])
return data,actuator
def addDatasetToData(data,name,h5obj):
if not name in data:
return np.array([h5obj[()]])
else:
return np.append(data[name],np.array([h5obj[()]]),axis=0)

82
interface/snap.py Normal file
View File

@@ -0,0 +1,82 @@
import numpy as np
import yaml
import os
import datetime
import epics
# things to do:
# 1. Read a snapshot file (not request file)
# 2. add parameters and performance channels (e.g. AT photon energies)
def parseSnapShotReqYAML(filename):
# read the snapshot request file
# returns a list of PV names
if not filename:
filename = '/sf/data/applications/snapshot/req/op/SF_settings.yaml'
pvs = []
path = os.path.dirname(filename)
with open(filename) as f:
try:
content = yaml.load(f, Loader=yaml.SafeLoader)
if 'include' in content.keys():
if len(content['include']) > 0:
for cont in content['include']:
retpv = parseSnapShotReqYAML(path+'/'+cont['name'])
if 'macros' in cont.keys():
retpv = applyReqMacro(retpv,cont['macros'])
pvs = pvs + retpv
if 'pvs' in content.keys():
if 'list' in content['pvs']:
for pv in content['pvs']['list']:
pvs.append(pv['name'])
return pvs
return None
except yaml.YAMLError as e:
print(e)
return None
return None
def applyReqMacro(pvs_in,macros):
pvs = []
for macro in macros:
for key in macro:
tag='$('+key+')'
for pv in pvs_in:
if tag in pv:
pvs.append(pv.replace(tag,macro[key]))
for pv in pvs_in: # copy the ones without macro
if not '$(' in pv:
pvs.append(pv)
return pvs
def getSnap(pvs=None):
if not isinstance(pvs,list):
pvs = parseSnapShotReqYAML(pvs)
if not pvs:
return
ret={}
val = epics.caget_many(pvs)
for i,pv in enumerate(pvs):
if val[i]: # filter out None values
ret[pv]=float(val[i])
# epics.ca.clear_cache()
return ret
def saveSnap(pvs={},label="", comment = "generated by application"):
filename = datetime.datetime.now().strftime('/sf/data/applications/snapshot/SF_settings_%Y%m%d_%H%M%S.snap')
with open(filename,'w') as fid:
fid.write('#{"labels":["%s"],"comment":"%s", "machine_parms":{}, "save_time": 0.0, "req_file_name": "SF_settings.yaml"}\n' % (label,comment))
for key in pvs.keys():
if isinstance(pvs[key],int):
fid.write('%s,{"val": %d}\n' % (key,pvs[key]))
elif isinstance(pvs[key],float):
fid.write('%s,{"val": %f}\n' % (key,pvs[key]))
elif isinstance(pvs[key],str):
fid.write('%s,{"val": %s}\n' % (key,pvs[key]))

View File

@@ -0,0 +1,33 @@
import signal
import ServerBase
class ServerTemplate(ServerBase.ServerBase):
def __init__(self, PVroot = 'MyServer', debug = False):
self.version='1.0.0'
self.program ='Server Template'
super(ServerTemplate, self).__init__(PVroot,debug,'127.0.0.1', 5678) # last too numbers are the IP adress and port of watchdog
# connect to the individual handler, which must have the function terminate() implemented
# each handler should be their own thread to not interfere with the main process-loop
self.handler={}
def terminateSubThreads(self):
for subserver in self.handler.keys():
self.handler[subserver].terminate()
if __name__ == '__main__':
debug = True
server = ServerTemplate('SF-BC-SERVER', debug)
signal.signal(signal.SIGTERM,server.terminate)
try:
server.run()
except KeyboardInterrupt:
server.terminate(None,None)

4
util/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .zmqbase import ZMQBase
from .serverbase import ServerBase

92
util/serverbase.py Normal file
View File

@@ -0,0 +1,92 @@
import sys
import signal
import os
import socket
import logging
import logging.handlers
from logging.handlers import RotatingFileHandler
from datetime import datetime
import time
from epics import PV
sys.path.append('/sf/bd/packages/sfbd')
from sfbd.util import ZMQBase
class ServerBase(ZMQBase):
def __init__(self, root = 'MyServer', debug = False, WDServer = '127.0.0.1', WDPort = 5678):
super(ServerBase,self).__init__(WDServer,WDPort)
self.debug = debug
self.root = root
self.suffix=''
if self.debug:
self.suffix='-SIMU'
self.host = socket.gethostname()
self.pid = os.getpid() # process ID
# enabling logging
self.logfilename="/sf/data/applications/BD-SERVER/%s.log" % self.root
handler = RotatingFileHandler(filename=self.logfilename,
mode='a',
maxBytes=5 * 1024 * 1024,
backupCount=1,
delay=0)
if self.debug:
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M:%S')
else:
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M:%S',
handlers=[handler,])
self.logger=logging.getLogger(self.program)
# setting up ZMQ interface
self.ZMQServerInfo(self.root,self.host,self.pid)
# individual channels of main thread
self.PVstop = PV('%s:STOP%s' % (self.root,self.suffix))
self.PVstop.value = 0
self.PVstop.add_callback(self.stop)
self.PVping = PV('%s:PING%s' % (self.root,self.suffix))
self.PVlog = PV('%s:LOG%s' % (self.root,self.suffix))
def stop(self,pvname=None,value=None,**kws):
if value > 0:
self.logger.info('PV:STOP triggered at %s' % datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
self.running=False
def start(self):
self.logger.info('Starting Server: %s at %s' % (self.root,datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
self.logger.info('PV Root: %s' % self.root)
self.logger.info('Version: %s' % self.version)
self.logger.info('Host: %s' % self.host)
self.logger.info('PID: %d' % self.pid)
if self.debug:
self.logger.info('Debug Mode')
def run(self):
self.start()
self.running=True
while self.running:
time.sleep(1)
if self.ZMQPoll():
self.logger.info('Watchdog Server requested termination')
self.running = False
self.PVping.value = datetime.now().strftime('Last active at %Y-%m-%d %H:%M:%S')
self.terminate(None,None)
def terminate(self,signum,frame):
# stopping any sub thread with the server specific function terminateSubThreads
self.terminateSubThreads()
self.logger.info('Terminating Server at %s' % datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
self.ZMQPoll('quit') # informing the watchdog
print('Bunch Compressor Server is quitting...')
sys.exit(0)

66
util/zmqbase.py Normal file
View File

@@ -0,0 +1,66 @@
import zmq
import socket
import sys
class ZMQBase:
def __init__(self, host='127.0.0.1',port = 5678):
self.host=host
self.port = port
self.msg={'action':'','PV':'','host':'','pid':0}
self.REQUEST_TIMEOUT = 500
self.REQUEST_RETRIES = 2
self.SERVER_ENDPOINT = "tcp://%s:%d" % (host,port)
self.serverIsOffline=False # assume that it is online
def ZMQServerInfo(self,PVroot,host,pid):
self.msg['PV']=PVroot
self.msg['host']=host
self.msg['pid']=pid
def ZMQPoll(self,tag='ping'):
self.msg['action']=tag
context = zmq.Context()
client = context.socket(zmq.REQ)
client.connect(self.SERVER_ENDPOINT)
client.send_pyobj(self.msg)
retries_left = self.REQUEST_RETRIES
while True:
if (client.poll(self.REQUEST_TIMEOUT) & zmq.POLLIN) != 0:
reply = client.recv_pyobj()
check = self.ZMQIdentifyReply(reply)
if self.serverIsOffline:
self.logger.info("Watchdog server came online")
self.serverIsOffline=False
if check:
return (reply['action'] == 'quit')
else:
self.logger.warning("Malformed reply from server")
continue
retries_left -= 1
# Socket is confused. Close and remove it.
client.setsockopt(zmq.LINGER, 0)
client.close()
if retries_left == 0:
if not self.serverIsOffline:
self.logger.info("Watchdog server seems to be offline")
self.serverIsOffline=True
return False
# Create new connection
client = context.socket(zmq.REQ)
client.connect(self.SERVER_ENDPOINT)
client.send_pyobj(self.msg)
def ZMQIdentifyReply(self,reply):
for field in ['PV','host','pid']:
if not reply[field] == self.msg[field]:
return False
return True