Added support for adaptive orbit feedback and spectral analysis

This commit is contained in:
2023-05-31 11:57:13 +02:00
parent b82b023f33
commit f22a17852c
8 changed files with 327 additions and 437 deletions

View File

@ -1 +1,2 @@
from .adaptiveorbit import AdaptiveOrbit from .adaptiveorbit import AdaptiveOrbit
from .spectralanalysis import SpectralAnalysis

View File

@ -61,7 +61,13 @@ class AdaptiveOrbit:
name = pv[i].pvname name = pv[i].pvname
raise ValueError(f"PV-Channel {name} is not available") raise ValueError(f"PV-Channel {name} is not available")
return pvs return pvs
def flush(self):
with self.bsAR.pt.queue.mutex:
self.bsAR.pt.queue.queue.clear()
with self.bsAT.pt.queue.mutex:
self.bsAT.pt.queue.queue.clear()
def terminate(self): def terminate(self):
print('Stopping BSStream Thread...') print('Stopping BSStream Thread...')
self.bsAR.stop() self.bsAR.stop()

51
app/spectralanalysis.py Normal file
View File

@ -0,0 +1,51 @@
import time
import numpy as np
from bstrd import BSCache
from bstrd.bscache import make_channel_config, is_available
from epics import PV
class SpectralAnalysis:
"""
Wrapper class to bundle all daq/io needed for adaptive orbit feedback.
"""
def __init__(self):
self.bs = BSCache()
self.bs.stop()
self.channel = ''
self.channels = ['SARFE10-PSSS059:SPECTRUM_Y',
'SATOP21-PMOS127-2D:SPECTRUM_Y',
'SATOP31-PMOS132-2D:SPECTRUM_Y']
self.isConnected = False
def connect(self,ich):
if ich < 0 or ich >= len(self.channels):
return False
self.channel = self.channels[ich]
print('Connecting to BS-Channel:',self.channel)
self.bs.channels.clear()
self.bs.get_var(self.channel) # this starts the stream into the cache
self.pv = PV(self.channel.replace('_Y','_X'))
def terminate(self):
print('Stopping BSStream Thread...')
self.bs.stop()
self.bs.pt.running.clear() # for some reason I have to
def flush(self):
with self.bs.pt.queue.mutex:
self.bs.pt.queue.queue.clear()
def read(self):
data=self.bs.__next__()
return data['pid'],data[self.channel]
def readPV(self):
return self.pv.value
def getSpectrometerName(self):
return self.channel

View File

@ -0,0 +1,4 @@
from .snap import getSnap
from .save import saveDataset
from .load import loadDataset
from .elog import writeElog

View File

@ -1,6 +1,7 @@
import os
import elog import elog
def write(text, Title = 'Test', Application = 'SFBD-Module', Attachment = None): def writeElog(text, Title = 'Test', Application = 'SFBD-Module', Attachment = None):
""" """
Generates an entry in the electronic logbook of SwissFEL Commisisoning Data Generates an entry in the electronic logbook of SwissFEL Commisisoning Data
:param text: The text to be placed in the log book :param text: The text to be placed in the log book
@ -11,12 +12,11 @@ def write(text, Title = 'Test', Application = 'SFBD-Module', Attachment = None):
""" """
# supplemental info # supplemental info
Author = 'sfop' Author = os.getlogin()
Category = 'Measurement' # Info or Measurement Category = 'Measurement' # Info or Measurement
System = 'Beamdynamics' # Beamdynamics, Operation, Controls System = 'Beamdynamics' # Beamdynamics, Operation, Controls
dict_att = {'Author': Author, 'Application': Application, 'Category': Category, 'Title': Title, 'System': System} dict_att = {'Author': Author, 'Application': Application, 'Category': Category, 'Title': Title, 'System': System}
print('\nLog book entry generated')
logbook = elog.open('https://elog-gfa.psi.ch/SwissFEL+commissioning+data/', user='robot', password='robot') logbook = elog.open('https://elog-gfa.psi.ch/SwissFEL+commissioning+data/', user='robot', password='robot')
return logbook.post(text, attributes=dict_att, attachments=Attachment) return logbook.post(text, attributes=dict_att, attachments=Attachment)

View File

@ -1,67 +1,51 @@
import sys
import os
import datetime
import h5py import h5py
# add other classes
#sys.path.append('/sf/bd/packages/SFBD/src')
class Load: def loadDataset(filename):
def __init__(self, logger = None, program = 'SFBD', version = 'v1.0.0'): hid = h5py.File(filename, "r")
snap = loadSnap(hid)
self.program = program data = loadData(hid)
self.version = version act = loadActuator(hid)
self.author ='S. Reiche' hid.close()
self.file = None return data,act,snap
if logger == None:
logging.basicConfig(level=logging.INFO,
format='%(levelname)-8s %(message)s')
self.logger = logging.getLogger(self.program)
self.logger.info('Load class started at %s' % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
self.logger.info('Version: %s ' % self.version)
self.logger.info('Host: %s' % socket.gethostname())
else:
self.logger = logger
def open(self,filename):
self.file = h5py.File(filename, "r")
def close(self):
if self.file is not None:
self.file.close()
self.file = None
def loadSnap(self): def loadSnap(hid):
snap={} snap={}
if not 'experiment' in self.file.keys(): if not 'experiment' in hid.keys():
return snap return None
for key1 in self.file['experiment'].keys(): for key1 in hid['experiment'].keys():
for key2 in self.file['experiment'][key1].keys(): if isinstance(hid['experiment'][key1],h5py.Group):
val = self.file['experiment'][key1][key2][()] for key2 in hid['experiment'][key1].keys():
snap[key1+':'+key2]={'val':val} val = hid['experiment'][key1][key2][()]
return snap snap[key1+':'+key2]=val
else:
snap[key1]=hid['experiment'][key1][()]
return snap
def loadData(self,scanrun=1): def loadData(hid,scanrun=1):
run='scan_%d' % scanrun run='scan_%d' % scanrun
data = {} data = {}
for key1 in self.file[run]['data'].keys(): for key1 in hid[run]['data'].keys():
for key2 in self.file[run]['data'][key1].keys(): if isinstance(hid[run]['data'][key1],h5py.Group):
val = self.file[run]['data'][key1][key2][()] for key2 in hid[run]['data'][key1].keys():
val = hid[run]['data'][key1][key2][()]
data[key1+':'+key2]=val data[key1+':'+key2]=val
return data else:
data[key1]=hid[run]['data'][key1][()]
return data
def loadActuator(self,scanrun=1): def loadActuator(hid,scanrun=1):
run='scan_%d' % scanrun run='scan_%d' % scanrun
data = {} data = {}
if 'actuators' in self.file[run]['method'].keys(): if 'actuators' in hid[run]['method'].keys():
for key1 in self.file[run]['method']['actuators'].keys(): for key1 in hid[run]['method']['actuators'].keys():
for key2 in self.file[run]['method']['actuators'][key1].keys(): if isinstance(hid[run]['method']['actuators'],h5py.Group):
val = self.file[run]['method']['actuators'][key1][key2][()] for key2 in hid[run]['method']['actuators'][key1].keys():
val = hid[run]['method']['actuators'][key1][key2][()]
data[key1+':'+key2]={'val':val} data[key1+':'+key2]={'val':val}
return data else:
data[key1]=hid[run]['method']['actuators'][key1][()]
return data

View File

@ -2,147 +2,181 @@ import sys
import os import os
import datetime import datetime
import h5py import h5py
import logging
import socket import socket
from PIL import Image
# add other classes def getDatasetFileName(program='Unknown'):
#sys.path.append('/sf/bd/packages/SFBD/src') year = datetime.datetime.now().strftime('%Y')
month = datetime.datetime.now().strftime('%m')
day = datetime.datetime.now().strftime('%d')
class Save: path = '/sf/data/measurements/%s' % year
def __init__(self, logger = None, program = 'SFBD', version = 'v1.0.0'): if not os.path.exists(path):
os.makedirs(path)
path = '%s/%s' % (path,month)
if not os.path.exists(path):
os.makedirs(path)
path = '%s/%s' % (path,day)
if not os.path.exists(path):
os.makedirs(path)
datetag = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f')
filename=('%s/%s_%s' % (path, program.replace(' ','_'), datetag))
return filename
self.program = program
self.version = version
self.author ='S. Reiche'
self.filename=None
self.file = None
if logger == None:
logging.basicConfig(level=logging.INFO,
format='%(levelname)-8s %(message)s')
self.logger = logging.getLogger(self.program)
self.logger.info('Save class started at %s' % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
self.logger.info('Version: %s ' % self.version)
self.logger.info('Host: %s' % socket.gethostname())
else:
self.logger = logger
def saveDataset(program,data,actuator=None,snap=None,analysis=None,figures=None):
hid,filename = openDataset(program)
if not hid:
return None
# check if scan is multiple instances of a scan
if isinstance(data,list):
for iscan,singledata in enumerate(data):
writeData(hid,singledata,iscan)
else:
writeData(hid,data,1)
# same for actuator
if isinstance(actuator,list):
for iscan,singleactuator in enumerate(actuator):
writeActuator(hid,singleactuator,iscan)
else:
writeActuator(hid,actuator,1)
# and same for analysis
if isinstance(analysis,list):
for iscan,singleana in enumerate(analysis):
writeAnalysis(hid,singleana,iscan)
else:
writeAnalysis(hid,analysis,1)
# write aux data
writeSnap(hid,snap)
hid.close()
writeFigure(filename,figures)
def open(self): return filename
year = datetime.datetime.now().strftime('%Y')
month = datetime.datetime.now().strftime('%m')
day = datetime.datetime.now().strftime('%d')
path = '/sf/data/measurements/%s' % year
if not os.path.exists(path):
os.makedirs(path)
path = '%s/%s' % (path,month)
if not os.path.exists(path):
os.makedirs(path)
path = '%s/%s' % (path,day)
if not os.path.exists(path):
os.makedirs(path)
datetag = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f')
self.filename=('%s/%s_%s' % (path, self.program.replace(' ','_'), datetag))
self.file = h5py.File(self.filename+'.h5', "w")
# meta data header
dt = h5py.special_dtype(vlen=bytes)
dset=self.file.create_dataset('general/user',(1,),dtype=dt)
dset[0]=os.getlogin()
dset=self.file.create_dataset('general/application',(1,),dtype=dt)
dset[0]=self.program
dset=self.file.create_dataset('general/author',(1,),dtype=dt)
dset[0]=self.author
dset=self.file.create_dataset('general/version',(1,),dtype=dt)
dset[0]=self.version
dset=self.file.create_dataset('general/created',(1,),dtype=dt)
dset[0]=str(datetime.datetime.now())
def close(self): def openDataset(program):
if self.file is not None: if isinstance(program,str):
self.file.close() program={'Name':program,'Author':'Unknown','Version':'Unknown'}
self.file = None if not isinstance(program,dict):
return None,None
if not 'Author' in program.keys():
program['Author']='Unknown'
if not 'Version' in program.keys():
program['Version']='Unknown'
filename=getDatasetFileName(program['Name'])
hid= h5py.File(filename+'.h5', "w")
# meta data header
dt = h5py.special_dtype(vlen=bytes)
dset=hid.create_dataset('general/user',(1,),dtype=dt)
dset[0]=os.getlogin()
dset=hid.create_dataset('general/application',(1,),dtype=dt)
dset[0]=program['Name']
dset=hid.create_dataset('general/author',(1,),dtype=dt)
dset[0]=program['Author']
dset=hid.create_dataset('general/version',(1,),dtype=dt)
dset[0]=program['Version']
dset=hid.create_dataset('general/created',(1,),dtype=dt)
dset[0]=str(datetime.datetime.now())
return hid,filename
def writeData(hid, data, scanrun=1):
if not 'pid' in data.keys():
return
shape = data['pid'].shape
ndim = len(shape)
nsam = shape[-1]
nrec = 0
if ndim > 1:
nrec = shape[:-1][0]
hid.create_dataset("scan_%d/method/records" % scanrun,data=[nrec])
hid.create_dataset("scan_%d/method/samples" % scanrun,data=[nsam])
hid.create_dataset("scan_%d/method/dimension" % scanrun,data=[ndim])
hid.create_dataset("scan_%d/method/reducedData" % scanrun,data=[0]) # indicating that there is at least a 2D array for scalar data
# write the sensor raw value
for ele in data.keys():
name=ele.split(':')
if len(name)>1:
dset=hid.create_dataset('scan_%d/data/%s/%s' % (scanrun, name[0], name[1]), data=data[ele])
else:
dset=hid.create_dataset('scan_%d/data/%s' % (scanrun, name[0]), data=data[ele])
dset.attrs['system'] = getDatasetSystem(name[0])
dset.attrs['units'] = 'unknown'
def writeActuator(hid,act,scanrun=1):
if not act:
return
dt = h5py.special_dtype(vlen=bytes)
dset=hid.create_dataset("scan_%d/method/type" % scanrun,(1,),dtype=dt)
nact = len(act.keys())
if nact>0:
dset[0]='Scan'
else:
dset[0]='Time Recording'
for ele in act.keys():
name=ele.split(':')
if len(name)>1:
dset=hid.create_dataset("scan_%d/method/actuators/%s/%s" % (scanrun,name[0],name[1]),data=act[ele])
else:
dset=hid.create_dataset("scan_%d/method/actuators/%s" % (scanrun,name[0]),data=act[ele])
dset.attrs['system']=getDatasetSystem(name[0])
dset.attrs['units']='unknown'
def writeSnap(hid,val):
if not val:
return
for key in val.keys():
name=key.split(':')
if len(name)>1:
dset=hid.create_dataset('experiment/%s/%s' % (name[0],name[1]),data=val[key])
else:
dset=hid.create_dataset('experiment/%s/%s' % (name[0]),data=val[key])
dset.attrs['system']=getDatasetSystem(name[0])
dset.attrs['units']='unknown'
def writeAnalysis(hid,data,scanrun=1):
if not data:
return
for key in data.keys():
name=key.split(':')
if len(name)>1:
dset=hid.create_dataset('scan_%d/analysis/%s/%s' % (scanrun, name[0], name[1]), data=data[key])
else:
dset=hid.create_dataset('scan_%d/analysis/%s/%s' % (scanrun, name[0]), data=data[key])
dset.attrs['system']='analysis'
dset.attrs['units']='unknown'
def writeFigure(filename,figs):
for i,ele in enumerate(figs):
plotname='%s_Fig%d.png' % (filename,(i+1))
im = Image.open(ele)
im.save(plotname)
return None
def getDatasetSystem(name):
def writeSnap(self,val): if len(name) > 11:
for key in val.keys(): tag=name[8:9]
name=key.split(':') fulltag=name[8:12]
if 'value' in val[key].keys(): else:
data=val[key]['value'] tag=''
elif 'val' in val[key].keys(): fulltag=''
data=val[key]['val'] sys='Unknown'
else: if tag =='P':
continue sys='Photonics'
dset=self.file.create_dataset('experiment/%s/%s' % (name[0],name[1]),data=[data]) if tag =='D':
dset.attrs['system']=self.getSystem(name[0]) sys='Diagnostics'
dset.attrs['units']='unknown' if fulltag =='DSCR':
sys='Camera'
def writeAnalysis(self,data,scanrun=1): if tag == 'R':
for key1 in data.keys(): sys='RF'
for key2 in data[key1].keys(): if tag == 'M':
dset=self.file.create_dataset('scan_%d/analysis/%s/%s' % (scanrun, key1, key2), sys='Magnets'
data=data[key1][key2]) if tag == 'U':
sys='Undulator'
def writeData(self, data, scanrun=1): return sys
if not 'Shot:ID' in data.keys():
return
shape = data['Shot:ID'].shape
ndim = len(shape)
nsam = shape[-1]
nrec = 0
if ndim > 1:
nrec = shape[:-1][0]
self.file.create_dataset("scan_%d/method/records" % scanrun,data=[nrec])
self.file.create_dataset("scan_%d/method/samples" % scanrun,data=[nsam])
self.file.create_dataset("scan_%d/method/dimension" % scanrun,data=[ndim])
self.file.create_dataset("scan_%d/method/reducedData" % scanrun,data=[0]) # indicating that there is at least a 2D array for scalar data
# write the sensor raw value
for ele in data.keys():
name=ele.split(':')
dset=self.file.create_dataset('scan_%d/data/%s/%s' % (scanrun, name[0], name[1]), data=data[ele])
dset.attrs['system'] = self.getSystem(name[0])
dset.attrs['units'] = 'unknown'
def writeActuator(self,act,scanrun=1):
dt = h5py.special_dtype(vlen=bytes)
dset=self.file.create_dataset("scan_%d/method/type" % scanrun,(1,),dtype=dt)
if act.isActuator:
dset[0]='Scan'
else:
dset[0]='Time Recording'
for ele in act.actuators.keys():
name=ele.split(':')
dset=self.file.create_dataset("scan_%d/method/actuators/%s/%s" % (scanrun,name[0],name[1]),data=act.actuators[ele]['val'])
dset.attrs['system']=self.getSystem(name[0])
dset.attrs['units']='unknown'
def getSystem(self,name):
if len(name) > 11:
tag=name[8:9]
fulltag=name[8:12]
else:
tag=''
fulltag=''
sys='Unknown'
if tag =='P':
sys='Photonics'
if tag =='D':
sys='Diagnostics'
if fulltag =='DSCR':
sys='Camera'
if tag == 'R':
sys='RF'
if tag == 'M':
sys='Magnets'
if tag == 'U':
sys='Undulator'
return sys

View File

@ -1,261 +1,71 @@
import datetime
import re
import numpy as np import numpy as np
import yaml import yaml
import os import os
import json
import copy
from threading import Thread
import epics import epics
class Snap: # things to do:
def __init__(self,filename='/sf/data/applications/snapshot/req/op/SF_settings.yaml'): # 1. Read a snapshot file (not request file)
# 2. Save a snapshot file
# 3. add parameters and performance channels (e.g. AT photon energies)
self.pvs = self.parseYAML(filename) def parseSnapShotReqYAML(filename):
self.doAbort = False # read the snapshot request file
# returns a list of PV names
if not filename:
filename = '/sf/data/applications/snapshot/req/op/SF_settings.yaml'
pvs = []
path = os.path.dirname(filename)
with open(filename) as f:
try:
content = yaml.load(f, Loader=yaml.SafeLoader)
if 'include' in content.keys():
if len(content['include']) > 0:
for cont in content['include']:
retpv = parseSnapShotReqYAML(path+'/'+cont['name'])
if 'macros' in cont.keys():
retpv = applyReqMacro(retpv,cont['macros'])
pvs = pvs + retpv
if 'pvs' in content.keys():
if 'list' in content['pvs']:
for pv in content['pvs']['list']:
pvs.append(pv['name'])
return pvs
return None
except yaml.YAMLError as e:
print(e)
return None
return None
def my_caget_many(self): def applyReqMacro(pvs_in,macros):
# this skips quite some channels - don't know why? pvs = []
for macro in macros:
for key in macro:
tag='$('+key+')'
for pv in pvs_in:
if tag in pv:
pvs.append(pv.replace(tag,macro[key]))
for pv in pvs_in: # copy the ones without macro
if not '$(' in pv:
pvs.append(pv)
return pvs
pvdata = {} def getSnap(pvs=None):
pvchids = [] if not isinstance(pvs,list):
# create, don't connect or create callbacks pvs = parseSnapShotReqYAML(pvs)
for name in self.pvs: if not pvs:
chid = epics.ca.create_channel(name, connect=False, auto_cb=False) # note 1 return
pvchids.append(chid) ret={}
val = epics.caget_many(pvs)
# connect for i,pv in enumerate(pvs):
for chid in pvchids: if val[i]: # filter out None values
print(epics.ca.name(chid)) ret[pv]=float(val[i])
epics.ca.connect_channel(chid) epics.ca.clear_cache()
return ret
# request get, but do not wait for result
epics.ca.poll()
for chid in pvchids:
print(epics.ca.name(chid))
epics.ca.get(chid, wait=False) # note 2
# now wait for get() to complete
epics.ca.poll()
for chid in pvchids:
print(epics.ca.name(chid))
val = epics.ca.get_complete(chid,timeout=0.5)
if not val:
pvdata[epics.ca.name(chid)] = np.array([val])
epics.ca.clear_cache()
return pvdata
def abort(self):
self.doAbort=True
print('Aborting Snap')
def getSnapValues(self,force=True):
self.doAbort=False
ret={}
if self.pvs:
# ret=self.my_caget_many()
val = epics.caget_many(self.pvs)
for i,pv in enumerate(self.pvs):
if val[i]: # filter out None values
ret[pv]={'val':float(val[i])}
epics.ca.clear_cache()
return ret,{}
#-------------
# routines to parse the OP YAML file
def applyMacro(self,pvs_in,macros):
pvs = []
for macro in macros:
for key in macro:
tag='$('+key+')'
for pv in pvs_in:
if tag in pv:
pvs.append(pv.replace(tag,macro[key]))
for pv in pvs_in: # copy the ones without macro
if not '$(' in pv:
pvs.append(pv)
return pvs
def parseYAML(self,filename='/sf/data/applications/snapshot/req/op/SF_settings.yaml'):
pvs = []
path = os.path.dirname(filename)
with open(filename) as f:
try:
content = yaml.load(f, Loader=yaml.SafeLoader)
if 'include' in content.keys():
if len(content['include']) > 0:
for cont in content['include']:
retpv = self.parseYAML(path+'/'+cont['name'])
if 'macros' in cont.keys():
retpv = self.applyMacro(retpv,cont['macros'])
pvs = pvs + retpv
if 'pvs' in content.keys():
if 'list' in content['pvs']:
for pv in content['pvs']['list']:
pvs.append(pv['name'])
return pvs
return None
except yaml.YAMLError as e:
print(e)
return None
return None
class Old:
def __init__(self):
self.filename = filename
print('Estbalishing snapshot with request file:', filename,flush=True)
self.savepath = savepath
self.tolerance = 0.0005
self.pvnames = []
self.pvs = []
self.mppvnames = []
self.mppvs = []
self.machinepar = []
self.message = ''
if self.filename:
self.openRequestFile(self.filename)
def openRequestFile(self, filename):
self.filename = filename
self.rootname = self.filename.split('/')[-1]
isReq = True
if '.yaml' in filename:
isReq = False
# req_file = SnapshotReqFile(path=str(self.filename))
if newVersion:
if '.yaml' in filename:
req_file = SnapshotJsonFile(path=str(self.filename))
else:
req_file = SnapshotReqFile(path=str(self.filename))
pvs_list= req_file.read()[0]
print('PV List:-------------------------')
for i in range(len(pvs_list)):
print(pvs_list[i])
print(req_file.read()[1])
else:
if '.yaml' in filename:
self.filename=None
self.rootname = None
print('YAML files not supported')
return
req_file = SnapshotReqFile(str(self.filename))
pvs_list = req_file.read()
self.pvnames.clear()
self.machinepar.clear()
for ele in pvs_list:
if isinstance(ele, list):
self.pvnames = ele
elif isinstance(ele, dict):
if 'machine_params' in ele.keys():
self.machinepar = ele['machine_params']
Thread(target=self.connectPVs).start()
def connectPVs(self):
self.pvs = [PV(pv, auto_monitor=False) for pv in self.pvnames]
con = [pv.wait_for_connection(timeout=0.2) for pv in self.pvs]
pvscon=[]
for i, val in enumerate(con):
if val is False:
print('Cannot connect to PV:', self.pvs[i].pvname,flush=True)
else:
pvscon.append(self.pvs[i])
self.pvs = copy.deepcopy(pvscon)
if isinstance(self.machinepar,list):
self.mppvs = [PV(self.machinepar[key], auto_monitor=False) for key in self.machinepar]
else:
self.mppvs = [PV(self.machinepar[key], auto_monitor=False) for key in self.machinepar.keys()]
con = [pv.wait_for_connection(timeout=0.2) for pv in self.mppvs]
pvscon.clear()
for i, val in enumerate(con):
if val is False:
print('Cannot connect to mPV:', self.mppvs[i].pvname,flush=True)
else:
pvscon.append(self.mppvs[i])
self.mppvs=copy.deepcopy(pvscon)
def getSnapValues(self, force=True):
values = {}
val = [pv.get(timeout=0.6, use_monitor=False) for pv in self.pvs]
for i, pv in enumerate(self.pvs):
if val[i] is None:
if force:
continue
else:
return False
else:
values[pv.pvname] = {
"raw_name": pv.pvname, "val": val[i], "EGU": pv.units, "prec": pv.precision}
mvalues = {}
val = [pv.get(timeout=0.6, use_monitor=False) for pv in self.mppvs]
for i, pv in enumerate(self.mppvs):
if val[i] is None:
if force:
continue
else:
return False
else:
mvalues[pv.pvname] = {"value": val[i],
"units": pv.units, "precision": pv.precision}
return values, mvalues
def save(self, labels=[], comment="Generated by SFBD-Package", force=True):
if self.filename is None:
self.message = 'No Request File Loaded'
return False
val, mval = self.getSnapValues(force)
if isinstance(val, bool) and val == False:
self.message = 'Unsuccesful reading of PV channels (unforced access)'
return False
# construct file name
datetag = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
root = self.rootname.split('.req')[0]
files = self.savepath+root+'_'+datetag+'.snap'
filel = self.savepath+root+'_latest.snap'
# reshuffle from mval to keyword based machine values
mmval = {}
for key in self.machinepar.keys():
if self.machinepar[key] in mval.keys():
mmval[key] = mval[self.machinepar[key]]
# save file
parse_to_save_file(
val, files, macros=None, symlink_path=filel, comment=comment,
labels=[],
req_file_name=self.rootname, machine_params=mmval)
self.message = 'Snapshot saved to '+files
return True
def restore(self,filename,refilter='',force=True):
filepath=self.savepath+filename
prog=re.compile(refilter)
save_pvs=parse_from_save_file(filepath)
res={}
for ele in save_pvs:
if isinstance(ele,dict):
for key in ele.keys():
if prog.match(key):
res[key]=ele[key]['value']
for pv in self.pvs:
if pv.pvname in res.keys():
val=pv.get()
if val is None or np.abs(val-res[pv.pvname]) > self.tolerance:
pv.put(res[pv.pvname])
self.message ='Snap restored'