Files
sfbd/app/adaptivecompression.py
2026-02-07 10:54:18 +01:00

135 lines
4.0 KiB
Python

import time
import numpy as np
from bstrd import BSCache
from bstrd.bscache import is_available
from epics import PV
class AdaptiveCompression:
"""
Wrapper class to bundle all daq/io needed for adaptive orbit feedback.
"""
def __init__(self):
self.BS=None
self.PVKick=[]
self.PVFB = []
self.BSChannels=[]
self.BS=None
self.PVKick=None
self.PVFB=None
def getChannels(self,beamline,bunch):
if beamline == 'Aramis':
ch0 = ['SARFE10-PBIG050-EVR0:CALCI']
pvFB = ['SFB_COMPRESSION_BC1_AR:SP1','SFB_COMPRESSION_BC2_AR:SP1']
elif beamline == 'Athos':
ch0 = ['SATFE10-PEPG046-EVR0:CALCI']
pvFB = ['SFB_COMPRESSION_BC1_AT:SP1','SFB_COMPRESSION_BC2_AT:SP1']
else:
return False
self.pvPhase = []
if bunch == 1:
ch1 = ['SINBC02-DBCM410:LM-AG-CH2-B1', 'S10MA01-DCDR080:LM-AG-CH1-B1']
pvPhase = ['SINDLH:SET-BEAM-PHASE-OP','S10:SET-BEAM-PHASE-OP','SFB_COMPRESSION_BC1_AR:ONOFF1','SFB_COMPRESSION_BC2_AR:ONOFF1']
elif bunch == 2:
ch1 = ['SINBC02-DBCM410:LM-AG-CH2-B2','S10MA01-DCDR080:LM-AG-CH1-B2']
pvPhase = ['SINDLH:SET-BEAM-PHASE-OP','S10:SET-BEAM-PHASE-OP','SFB_COMPRESSION_BC1_AT:ONOFF1','SFB_COMPRESSION_BC2_AT:ONOFF1']
else:
return False
self.monchannel=ch1
print("Initializing BSStream for beamline %s and bunch %d" % (beamline,bunch))
self.BS = self.initBSStream(ch0+ch1)
if self.BS is None:
print('Failed to establish BS-stream')
return False
print("Initializing EPICS Channels")
self.PVFB=self.initPV(pvFB)
self.PVPhase=self.initPV(pvPhase)
if self.PVFB is None or self.PVPhase is None:
print('Failed to establish PV-channels')
return False
return True
def initBSStream(self,channels):
ok = True
for chn in channels:
ok &= is_available(chn)
if ok:
bs = BSCache(1000,receive_timeout=1000) # 1000 second time out, capazity for 1000 second.
bs.get_vars(channels)
else:
for chn in channels:
if not is_available(chn):
print('ERROR: BS-Channel %s not found in BS-Dispatcher' % ch)
bs = None
self.BSChannels.clear()
self.BSChannels=channels
return bs
def initPV(self,chx):
pvs = []
for x in chx:
pvs.append(PV(x))
con = [pv.wait_for_connection(timeout=0.2) for pv in pvs]
ok = True
for i, val in enumerate(con):
ok &=val
if val is False:
name = pv[i].pvname
print("ERROR: PV-Channel %s is not available" % name)
if ok:
return pvs
return None
def flush(self):
self.BS.flush()
def terminate(self):
print('Stopping BSStream Thread...')
self.BS.stop()
self.BS.pt.running.clear() # for some reason I have to
def read(self):
if self.BS is None:
return None
data=self.BS.__next__()
return np.array([data['pid']]+[data[cnl] for cnl in self.BSChannels])
def readPV(self):
if self.PVFB is None:
return []
return [pv.value for pv in self.PVFB]
def setPV(self,fbval):
if self.PVFB is None:
return
for i in range(len(fbval)):
self.PVFB[i].value = fbval[i]
def getPVNames(self):
if self.PVFB is None:
return []
return [pv.pvname for pv in self.PVFB]
def getDetectorName(self):
return self.BSChannels[0]
def getPhase(self):
if self.PVPhase is None:
return None
return [pv.value for pv in self.PVPhase]
def setPhase(self,vals):
if self.PVPhase is None:
return
for i,val in enumerate(vals):
self.PVPhase[i].value = val