import time import numpy as np from sfbd.ext.reichebscombined import ReicheBSCombined from slic.core.adjustable import PVAdjustable class AdaptiveOrbit: """ Wrapper class to bundle all daq/io needed for adaptive orbit feedback. """ def __init__(self,beamline='Aramis'): self.beamline=None self.sensor = None self.actuator = None self.getBeamline(beamline) def read(self): if not self.sensor or not self.beamline: return None data=self.sensor.get_current_value() if not data: return None retval={'pid':data['pid'], 'Signal':data[self.channel0[0]], 'X':np.array([data[i] for i in self.channelX]), 'Y':np.array([data[i] for i in self.channelY])} return retval def read_adj(self): if not self.actuator or not self.beamline: return None return [pv.get_current_value() for pv in self.actuator] def getBeamline(self,beamline='Aramis'): if beamline.upper() == 'ARAMIS': self.beamline='Aramis' self.channelX=['SARUN%2.2d-DBPM070:X1' % id for id in range(1,17)] self.channelY=['SARUN%2.2d-DBPM070:Y1' % id for id in range(1,17)] self.channel0 = ['SARFE10-PBIG050-EVR0:CALCI'] FB_bpms = [x.replace(':X1',':X-REF-FB') for x in self.channelX] + [x.replace(':Y1',':Y-REF-FB') for x in self.channelY] elif beamline.upper() == 'ATHOS': self.beamline=None else: self.beamline=None return channels=self.channel0+self.channelX+self.channelY self.sensor = ReicheBSCombined('AdapiveOrbit',channels) self.actuator = [PVAdjustable(pv) for pv in FB_bpms]