import time import numpy as np from bstrd import BSCache from bstrd.bscache import is_available from epics import PV class AdaptiveCompression: """ Wrapper class to bundle all daq/io needed for adaptive orbit feedback. """ def __init__(self): self.BS=None self.PVKick=[] self.PVFB = [] self.BSChannels=[] self.BS=None self.PVKick=None self.PVFB=None def getChannels(self,beamline,bunch): if beamline == 'Aramis': ch0 = ['SARFE10-PBIG050-EVR0:CALCI'] pvFB = ['SFB_COMPRESSION_BC1_AR:SP1','SFB_COMPRESSION_BC2_AR:SP1'] elif beamline == 'Athos': ch0 = ['SATFE10-PEPG046-EVR0:CALCI'] pvFB = ['SFB_COMPRESSION_BC1_AT:SP1','SFB_COMPRESSION_BC2_AT:SP1'] else: return False self.pvPhase = [] if bunch == 1: ch1 = ['SINBC02-DBCM410:LM-AG-CH2-B1', 'S10MA01-DCDR080:LM-AG-CH1-B1'] pvPhase = ['SINDLH:SET-BEAM-PHASE-OP','S10:SET-BEAM-PHASE-OP','SFB_COMPRESSION_BC1_AR:ONOFF1','SFB_COMPRESSION_BC2_AR:ONOFF1'] elif bunch == 2: ch1 = ['SINBC02-DBCM410:LM-AG-CH2-B2','S10MA01-DCDR080:LM-AG-CH1-B2'] pvPhase = ['SINDLH:SET-BEAM-PHASE-OP','S10:SET-BEAM-PHASE-OP','SFB_COMPRESSION_BC1_AT:ONOFF1','SFB_COMPRESSION_BC2_AT:ONOFF1'] else: return False self.monchannel=ch1 print("Initializing BSStream for beamline %s and bunch %d" % (beamline,bunch)) self.BS = self.initBSStream(ch0+ch1) if self.BS is None: print('Failed to establish BS-stream') return False print("Initializing EPICS Channels") self.PVFB=self.initPV(pvFB) self.PVPhase=self.initPV(pvPhase) if self.PVFB is None or self.PVPhase is None: print('Failed to establish PV-channels') return False return True def initBSStream(self,channels): ok = True for chn in channels: ok &= is_available(chn) if ok: bs = BSCache(1000,receive_timeout=1000) # 1000 second time out, capazity for 1000 second. bs.get_vars(channels) else: for chn in channels: if not is_available(chn): print('ERROR: BS-Channel %s not found in BS-Dispatcher' % ch) bs = None self.BSChannels.clear() self.BSChannels=channels return bs def initPV(self,chx): pvs = [] for x in chx: pvs.append(PV(x)) con = [pv.wait_for_connection(timeout=0.2) for pv in pvs] ok = True for i, val in enumerate(con): ok &=val if val is False: name = pv[i].pvname print("ERROR: PV-Channel %s is not available" % name) if ok: return pvs return None def flush(self): self.BS.flush() def terminate(self): print('Stopping BSStream Thread...') self.BS.stop() self.BS.pt.running.clear() # for some reason I have to def read(self): if self.BS is None: return None data=self.BS.__next__() return np.array([data['pid']]+[data[cnl] for cnl in self.BSChannels]) def readPV(self): if self.PVFB is None: return [] return [pv.value for pv in self.PVFB] def setPV(self,fbval): if self.PVFB is None: return for i in range(len(fbval)): self.PVFB[i].value = fbval[i] def getPVNames(self): if self.PVFB is None: return [] return [pv.pvname for pv in self.PVFB] def getDetectorName(self): return self.BSChannels[0] def getPhase(self): if self.PVPhase is None: return None return [pv.value for pv in self.PVPhase] def setPhase(self,vals): if self.PVPhase is None: return for i,val in enumerate(vals): self.PVPhase[i].value = val