Filling template for the interface module (loading, saving, elog etc)

This commit is contained in:
2023-05-23 16:39:27 +02:00
parent cb3a2ba501
commit b82b023f33
6 changed files with 622 additions and 37 deletions

View File

@ -1,53 +1,128 @@
import time
import numpy as np
from sfbd.ext.reichebscombined import ReicheBSCombined
from slic.core.adjustable import PVAdjustable
from bstrd import BSCache
from bstrd.bscache import make_channel_config, is_available
from epics import PV
class AdaptiveOrbit:
"""
Wrapper class to bundle all daq/io needed for adaptive orbit feedback.
"""
def __init__(self,beamline='Aramis'):
self.beamline=None
self.sensor = None
self.actuator = None
self.getBeamline(beamline)
def __init__(self):
# Aramis Channels
self.ARch0 = 'SARFE10-PBIG050-EVR0:CALCI'
self.ARchx = ['SARUN%2.2d-DBPM070:X1' % id for id in range(1,17)]
self.ARchy = ['SARUN%2.2d-DBPM070:Y1' % id for id in range(1,17)]
self.bsAR = self.initBSStream([self.ARch0]+self.ARchx+self.ARchy)
self.pvAR = self.initPV(self.ARchx)
self.kickerAR = self.initPV(['SARMA02-MCRX050:I-SET','SARMA02-MCRY050:I-SET','SARUN02-MCRX080:I-SET','SARUN02-MCRY080:I-SET','SFB_ORBIT_SAR:ONOFF1'])
# Athos Channels
self.ATch0 = 'SATFE10-PEPG046-EVR0:CALCI'
self.ATchx=[]
self.ATchy=[]
for bpm in range(5,23):
idx = '070'
if bpm == 5 or bpm ==14:
idx='410'
self.ATchx.append('SATUN%2.2d-DBPM%s:X1' % (bpm,idx))
self.ATchy.append('SATUN%2.2d-DBPM%s:Y1' % (bpm,idx))
self.bsAT = self.initBSStream([self.ATch0]+self.ATchx+self.ATchy)
self.pvAT = self.initPV(self.ATchx)
self.kickerAR = self.initPV(['SATMA01-MCRX610:I-SET','SATMA01-MCRY610:I-SET','SATUN05-MCRX420:I-SET','SATUN05-MCRY420:I-SET','SFB_ORBIT_SAT:ONOFF1'])
# select first beamline
self.isAramis = True
def initBSStream(self,channels):
print("Initializing BSstream")
bs = BSCache()
bs.stop()
for cnl in channels[1:]:
if not is_available(cnl):
raise ValueError(f"BS-Channel {cbl} is not available")
res = make_channel_config(cnl,None,None)
bs.channels[res]=res
bs.get_var(channels[0]) # this starts also the stream into the cache
return bs
def initPV(self,chx):
print("Initializing EPICS Channels")
pvs = []
for x in chx:
pvs.append(PV(x.replace(':X1',':X-REF-FB')))
pvs.append(PV(x.replace(':X1',':Y-REF-FB')))
con = [pv.wait_for_connection(timeout=0.2) for pv in pvs]
for i, val in enumerate(con):
if val is False:
name = pv[i].pvname
raise ValueError(f"PV-Channel {name} is not available")
return pvs
def terminate(self):
print('Stopping BSStream Thread...')
self.bsAR.stop()
self.bsAR.pt.running.clear() # for some reason I have to
self.bsAT.stop()
self.bsAT.pt.running.clear() # for some reason I have to
def getBeamline(self,beamline):
if beamline == 'Aramis':
self.isAramis = True
else:
self.isAthos = True
# all routine for accessing the machine (read/write)
def read(self):
if not self.sensor or not self.beamline:
return None
data=self.sensor.get_current_value()
if not data:
return None
retval={'pid':data['pid'],
'Signal':data[self.channel0[0]],
'X':np.array([data[i] for i in self.channelX]),
'Y':np.array([data[i] for i in self.channelY])}
return retval
if self.isAramis:
data=self.bsAR.__next__()
return data['pid'],data[self.ARch0],np.array([data[cnl] for cnl in self.ARchx]),np.array([data[cnl] for cnl in self.ARchy])
data=self.bsAT.__next__()
return data['pid'],data[self.ATch0],np.array([data[cnl] for cnl in self.ATchx]),np.array([data[cnl] for cnl in self.ATchy])
def read_adj(self):
if not self.actuator or not self.beamline:
return None
return [pv.get_current_value() for pv in self.actuator]
def readPV(self):
if self.isAramis:
return [pv.value for pv in self.pvAR]
return [pv.value for pv in self.pvAT]
def getBeamline(self,beamline='Aramis'):
if beamline.upper() == 'ARAMIS':
self.beamline='Aramis'
self.channelX=['SARUN%2.2d-DBPM070:X1' % id for id in range(1,17)]
self.channelY=['SARUN%2.2d-DBPM070:Y1' % id for id in range(1,17)]
self.channel0 = ['SARFE10-PBIG050-EVR0:CALCI']
FB_bpms = [x.replace(':X1',':X-REF-FB') for x in self.channelX] + [x.replace(':Y1',':Y-REF-FB') for x in self.channelY]
elif beamline.upper() == 'ATHOS':
self.beamline=None
else:
self.beamline=None
def setPV(self,fbval):
if self.isAramis:
for i in range(len(fbval)):
self.pvAR[i].value = fbval[i]
return
for i in range(len(fbval)):
self.pvAT[i].value = fbval[i]
def getPVNames(self):
if self.isAramis:
return [pv.pvname for pv in self.pvAR]
return [pv.pvname for pv in self.pvAT]
def getDetectorName(self):
if self.isAramis:
return self.ARch0
return self.ATch0
def getKicker(self):
if self.isAramis:
return [pv.value for pv in self.kickerAR]
return [pv.value for pv in self.kickerAT]
def setKicker(self,vals):
return
if self.isAramis:
for i,val in enumerate(vals):
self.kickerAR[i].value = val
return
channels=self.channel0+self.channelX+self.channelY
self.sensor = ReicheBSCombined('AdapiveOrbit',channels)
self.actuator = [PVAdjustable(pv) for pv in FB_bpms]
for i,val in enumerate(vals):
self.kickerAT[i].value = val