import time import numpy as np from bstrd import BSCache from bstrd.bscache import make_channel_config, is_available from epics import PV class AdaptiveOrbit: """ Wrapper class to bundle all daq/io needed for adaptive orbit feedback. """ def __init__(self): # Aramis Channels self.ARch0 = 'SARFE10-PBIG050-EVR0:CALCI' self.ARchx = ['SARUN%2.2d-DBPM070:X1' % id for id in range(1,17)] self.ARchy = ['SARUN%2.2d-DBPM070:Y1' % id for id in range(1,17)] self.bsAR = self.initBSStream([self.ARch0]+self.ARchx+self.ARchy) self.pvAR = self.initPV(self.ARchx) self.kickerAR = self.initPV(['SARMA02-MCRX050:I-SET','SARMA02-MCRY050:I-SET','SARUN02-MCRX080:I-SET','SARUN02-MCRY080:I-SET','SFB_ORBIT_SAR:ONOFF1']) # Athos Channels self.ATch0 = 'SATFE10-PEPG046-EVR0:CALCI' self.ATchx=[] self.ATchy=[] for bpm in range(5,23): idx = '070' if bpm == 5 or bpm ==14: idx='410' self.ATchx.append('SATUN%2.2d-DBPM%s:X1' % (bpm,idx)) self.ATchy.append('SATUN%2.2d-DBPM%s:Y1' % (bpm,idx)) self.bsAT = self.initBSStream([self.ATch0]+self.ATchx+self.ATchy) self.pvAT = self.initPV(self.ATchx) self.kickerAT = self.initPV(['SATMA01-MCRX610:I-SET','SATMA01-MCRY610:I-SET','SATUN05-MCRX420:I-SET','SATUN05-MCRY420:I-SET','SFB_ORBIT_SAT:ONOFF1']) # select first beamline self.isAramis = True def initBSStream(self,channels): print("Initializing BSstream") bs = BSCache(100000,receive_timeout=100000) # 1000 second time out, capazity for 1000 second. bs.get_vars(channels) return bs def initPV(self,chx): print("Initializing EPICS Channels") pvs = [] for x in chx: if ':X1' in x: pvs.append(PV(x.replace(':X1',':X-REF-FB'))) pvs.append(PV(x.replace(':X1',':Y-REF-FB'))) elif ':X2' in x: pvs.append(PV(x.replace(':X2',':X-REF-FB'))) pvs.append(PV(x.replace(':X2',':Y-REF-FB'))) else: pvs.append(PV(x)) con = [pv.wait_for_connection(timeout=0.2) for pv in pvs] for i, val in enumerate(con): if val is False: name = pv[i].pvname raise ValueError(f"PV-Channel {name} is not available") return pvs def flush(self): self.bsAR.flush() self.bsAT.flush() def terminate(self): print('Stopping BSStream Thread...') self.bsAR.stop() self.bsAR.pt.running.clear() # for some reason I have to self.bsAT.stop() self.bsAT.pt.running.clear() # for some reason I have to def getBeamline(self,beamline): if beamline == 'Aramis': self.isAramis = True else: self.isAramis = False # all routine for accessing the machine (read/write) def read(self): if self.isAramis: data=self.bsAR.__next__() return data['pid'],data[self.ARch0],np.array([data[cnl] for cnl in self.ARchx]),np.array([data[cnl] for cnl in self.ARchy]) data=self.bsAT.__next__() return data['pid'],data[self.ATch0],np.array([data[cnl] for cnl in self.ATchx]),np.array([data[cnl] for cnl in self.ATchy]) def readPV(self): if self.isAramis: return [pv.value for pv in self.pvAR] return [pv.value for pv in self.pvAT] def setPV(self,fbval): if self.isAramis: for i in range(len(fbval)): self.pvAR[i].value = fbval[i] else: for i in range(len(fbval)): self.pvAT[i].value = fbval[i] def getPVNames(self): if self.isAramis: return [pv.pvname for pv in self.pvAR] return [pv.pvname for pv in self.pvAT] def getDetectorName(self): if self.isAramis: return self.ARch0 return self.ATch0 def getKicker(self): if self.isAramis: return [pv.value for pv in self.kickerAR] return [pv.value for pv in self.kickerAT] def setKicker(self,vals): if self.isAramis: for i,val in enumerate(vals): self.kickerAR[i].value = val return for i,val in enumerate(vals): self.kickerAT[i].value = val