diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..13ed080 --- /dev/null +++ b/app/__init__.py @@ -0,0 +1,5 @@ +from .adaptiveorbit import AdaptiveOrbit +from .spectralanalysis import SpectralAnalysis +from .hero import LaserPower,EnergyModulation +from .dispersiontools import Dispersion +from .xtcavstabilizer import XTCAVStabilizer diff --git a/app/adaptiveorbit.py b/app/adaptiveorbit.py new file mode 100644 index 0000000..af1f089 --- /dev/null +++ b/app/adaptiveorbit.py @@ -0,0 +1,131 @@ +import time +import numpy as np + +from bstrd import BSCache +from bstrd.bscache import make_channel_config, is_available +from epics import PV + +class AdaptiveOrbit: + """ + Wrapper class to bundle all daq/io needed for adaptive orbit feedback. + """ + def __init__(self): + + # Aramis Channels + self.ARch0 = 'SARFE10-PBIG050-EVR0:CALCI' + self.ARchx = ['SARUN%2.2d-DBPM070:X1' % id for id in range(1,17)] + self.ARchy = ['SARUN%2.2d-DBPM070:Y1' % id for id in range(1,17)] + self.bsAR = self.initBSStream([self.ARch0]+self.ARchx+self.ARchy) + self.pvAR = self.initPV(self.ARchx) + self.kickerAR = self.initPV(['SARMA02-MCRX050:I-SET','SARMA02-MCRY050:I-SET','SARUN02-MCRX080:I-SET','SARUN02-MCRY080:I-SET','SFB_ORBIT_SAR:ONOFF1']) + + # Athos Channels + self.ATch0 = 'SATFE10-PEPG046-EVR0:CALCI' + self.ATchx=[] + self.ATchy=[] + for bpm in range(5,23): + idx = '070' + if bpm == 5 or bpm ==14: + idx='410' + self.ATchx.append('SATUN%2.2d-DBPM%s:X2' % (bpm,idx)) + self.ATchy.append('SATUN%2.2d-DBPM%s:Y2' % (bpm,idx)) + self.bsAT = self.initBSStream([self.ATch0]+self.ATchx+self.ATchy) + self.pvAT = self.initPV(self.ATchx) + self.kickerAT = self.initPV(['SATMA01-MCRX610:I-SET','SATMA01-MCRY610:I-SET','SATUN05-MCRX420:I-SET','SATUN05-MCRY420:I-SET','SFB_ORBIT_SAT:ONOFF1']) + + # select first beamline + self.isAramis = True + + + def initBSStream(self,channels): + print("Initializing BSstream") + bs = BSCache(100000,100000) # 1000 second time out, capazity for 1000 second. + bs.get_vars(channels) + return bs + + def initPV(self,chx): + print("Initializing EPICS Channels") + pvs = [] + for x in chx: + if ':X1' in x: + pvs.append(PV(x.replace(':X1',':X-REF-FB'))) + pvs.append(PV(x.replace(':X1',':Y-REF-FB'))) + elif ':X2' in x: + pvs.append(PV(x.replace(':X2',':X-REF-FB'))) + pvs.append(PV(x.replace(':X2',':Y-REF-FB'))) + else: + pvs.append(PV(x)) + con = [pv.wait_for_connection(timeout=0.2) for pv in pvs] + for i, val in enumerate(con): + if val is False: + name = pv[i].pvname + raise ValueError(f"PV-Channel {name} is not available") + return pvs + + def flush(self): + self.bsAR.flush() + self.bsAT.flush() + + def terminate(self): + print('Stopping BSStream Thread...') + self.bsAR.stop() + self.bsAR.pt.running.clear() # for some reason I have to + self.bsAT.stop() + self.bsAT.pt.running.clear() # for some reason I have to + + def getBeamline(self,beamline): + if beamline == 'Aramis': + self.isAramis = True + else: + self.isAramis = False + + # all routine for accessing the machine (read/write) + + def read(self): + if self.isAramis: + data=self.bsAR.__next__() + return data['pid'],data[self.ARch0],np.array([data[cnl] for cnl in self.ARchx]),np.array([data[cnl] for cnl in self.ARchy]) + data=self.bsAT.__next__() + return data['pid'],data[self.ATch0],np.array([data[cnl] for cnl in self.ATchx]),np.array([data[cnl] for cnl in self.ATchy]) + + + def readPV(self): + if self.isAramis: + return [pv.value for pv in self.pvAR] + return [pv.value for pv in self.pvAT] + + + def setPV(self,fbval): + if self.isAramis: + for i in range(len(fbval)): + self.pvAR[i].value = fbval[i] + else: + for i in range(len(fbval)): + self.pvAT[i].value = fbval[i] + + def getPVNames(self): + if self.isAramis: + return [pv.pvname for pv in self.pvAR] + return [pv.pvname for pv in self.pvAT] + + def getDetectorName(self): + if self.isAramis: + return self.ARch0 + return self.ATch0 + + def getKicker(self): + if self.isAramis: + return [pv.value for pv in self.kickerAR] + return [pv.value for pv in self.kickerAT] + + def setKicker(self,vals): + if self.isAramis: + for i,val in enumerate(vals): + self.kickerAR[i].value = val + return + for i,val in enumerate(vals): + self.kickerAT[i].value = val + + + + diff --git a/app/dispersiontools.py b/app/dispersiontools.py new file mode 100644 index 0000000..c1789a8 --- /dev/null +++ b/app/dispersiontools.py @@ -0,0 +1,316 @@ +import datetime +import re +import numpy as np + +from bsread import dispatcher +import epics + +from slic.core.adjustable import PVAdjustable +from slic.core.acquisition import BSAcquisition, PVAcquisition +from slic.core.scanner import Scanner +from sfbd.ext import CounterAdjustable +from sfbd.ext import BSCAcquisition +from sfbd.interface import SlicScan +from bstrd import BSCache + +def getAux(pvs=None): + if not pvs: + return + ret={} + val = epics.caget_many(pvs) + for i,pv in enumerate(pvs): + if val[i]: # filter out None values + ret[pv]=float(val[i]) + return ret + + +def getBSChannels(regexp): + prog = re.compile(regexp) + res = [] + for bs in dispatcher.get_current_channels(): + if prog.match(bs['name']): + res.append(bs['name']) + return res + +def getRFCalibrationChannels(sensors,energy): + aux=[] + for sen in sensors: + if 'PHASE-VS' in sen: + aux.append(sen.replace('PHASE-VS','GET-VSUM-PHASE-OFFSET').replace('RLLE-DSP','RSYS')) + aux.append(sen.replace('PHASE-VS','GET-VSUM-AMPLT-SCALE').replace('RLLE-DSP','RSYS')) + aux.append(sen.replace('PHASE-VS','SM-GET').replace('RLLE-DSP','RMSM')) + aux.append(energy) + return aux + + +class Dispersion: + def __init__(self, branch = 'Aramis'): + self.scanname = 'Dispersion' + self.branch = None + + dirpath= datetime.datetime.now().strftime('measurements/%Y/%m/%d/slic_sfbd') + self.basedir = '/sf/data/%s/%s' % (dirpath,self.scanname) + self.pgroup='%s/%s' % (dirpath,self.scanname) + self.scandir='/sf/data/'+self.pgroup + + self.setBranch(branch) # enfore Athos dump settings for now + self.Nsteps = 2 + self.Nsamples = 1 + self.bsc = None + + def setBranch(self,branch = 'Aramis'): + self.sc = None + if branch == 'Athos Dump': + self.setupAthosDump() + elif branch == 'Aramis': + self.setupAramis() + elif branch == 'Athos': + self.setupAthos() + elif branch =='Bunch Compressor 2': + self.setupBC2() + else: + self.branch = None + return self.branch + + def setup(self,scl = 1, Nsteps=5, Nsamples=10): + # the setup is done from the main thread. Therefore I have to define the BSC here + self.N = Nsteps + self.Ns= Nsamples + self.scale = scl + # define stream + print('Getting BSCache') + self.bsc = BSCache(100000,10000) # 100 second timeout, size for 100 second data taken + self.bsc.get_vars(self.sensor) # this starts the stream into the cache + print('Getting BSCache done') + + def scan(self): + # core routine to do all action. Will be a thread of the slic scanner wrapper + if not self.branch: + return + + # define acquisition channels wrapper for BSQueue + pgroup = '%s-%s' % (self.pgroup,self.branch) + acq = [BSCAcquisition(".",pgroup, default_channels=[self.bsc])] + + # define the scanner + self.scanner = Scanner(data_base_dir=self.basedir, # root directory of data location e.g. /sf/data/2023/02/02/Dispersion + scan_info_dir=self.basedir, # write also scan info there + make_scan_sub_dir=True, # put each scan in its own directory + default_acquisitions=acq) # adjustable to use + + # define adjustable + self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1,ID = self.adjSV, name = self.name) + val = self.adj.get_current_value(readback=False) + if self.adj2SV: + self.adj2 = PVAdjustable(self.adj2SV,pvname_readback = self.adj2RB, accuracy = 0.1,ID = self.adjSV, name = self.name) + val2 = self.adj2.get_current_value(readback=False) + dval = self.amp*self.scale + print(self.adjSV,' - Scan Range:',val-dval,'to',val+dval) + if self.adj2SV: + print(self.adj2SV,' - Scan Range:',val2+dval,'to',val2-dval) + + # create scanner backend + if self.adj2SV: + self.sc=self.scanner.a2scan(self.adj, val-dval, val+dval, + self.adj2, val2+dval, val2-dval, + n_intervals = self.N-1, # steps + n_pulses = self.Ns, # samples + filename=self.branch, # relative directory for data + start_immediately = False, # wait for execution to performe pre-action items + return_to_initial_values=True) # return to initial values + else: + self.sc=self.scanner.ascan(self.adj, val-dval, val+dval, + n_intervals = self.N-1, # steps + n_pulses = self.Ns, # samples + filename=self.branch, # relative directory for data + start_immediately = False, # wait for execution to performe pre-action items + return_to_initial_values=True) # return to initial values + + # get aux data first + self.auxdata = getAux(self.aux) + self.preaction() + self.sc.run() + self.postaction() + self.bsc.stop() # stop the queue + + + +################################# +# definition of the individual branches + + def setupBC2(self): + # branch and name tag + self.branch='Bunch Compressor 2' + self.name ='BC2' + + # pre-scan item - needs an adjustment of the dipole current of about 2.3e-3 - to be implemented later. + self.pre = {} + self.pre['SFB_COMPRESSION_BC2_AR:ONOFF1']={'Val':0,'InitVal':0} + self.pre['SFB_COMPRESSION_BC2_AR:ONOFF2']={'Val':0,'InitVal':0} + self.pre['SFB_ORBIT_S10:ONOFF1']={'Val':0,'InitVal':0} + for pv in self.pre.keys(): + self.pre[pv]['adj']=PVAdjustable(pv) + + # adjustable + self.adjSV = 'S10:SET-E-GAIN-OP' + self.adjRB = 'S10:GET-E-GAIN-OP' + self.adj2SV = 'S20:SET-E-GAIN-OP' # counter adjustable + self.adj2RB = 'S20:GET-E-GAIN-OP' + self.amp = 5 # the amplitude of the scan, which can be scaled + + + # acquisition + sensor1 = getBSChannels('S10[BM].*-DBPM.*:[XY]1$') + sensor2 = getBSChannels('S1.*-RLLE-DSP:.*-VS$') + self.sensor = sensor1+sensor2 + + # auxiliar data to be read one + self.aux = getRFCalibrationChannels(sensor2,'SINBC02-MBND100:ENERGY-OP') + + def setupAthosDump(self): + # branch and name tag + self.branch='Athos_Dump' + self.name ='SATCB01-Linac' + + # pre-scan item - needs an adjustment of the dipole current of about 2.3e-3 - to be implemented later. + self.pre = {} + self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0} + self.pre['SFB_ORBIT_SAT:ONOFF1']={'Val':0,'InitVal':0} + for i in range(1,5): + self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0} + for pv in self.pre.keys(): + self.pre[pv]['adj']=PVAdjustable(pv) + + # adjustable + self.adjSV = 'SATCB01-RSYS:SET-BEAM-PHASE' + self.adjRB = 'SATCB01-RSYS:GET-BEAM-PHASE' + self.adj2SV = None + self.adj2RB = None + self.amp = 30 # the amplitude of the scan, which can be scaled + + # acquisition + sensor1 = getBSChannels('SATBD02-DBPM.*:Y2$') + sensor2 = getBSChannels('SATCB.*-RLLE-DSP:.*-VS$') + self.sensor = sensor1+sensor2 + + # auxiliar data to be read one + self.aux = getRFCalibrationChannels(sensor2,'SATCL01-MBND100:ENERGY-OP') + + def setupAramis(self): + # branch and name tag + self.branch='Aramis' + self.name = 'Linac3' + + # pre-scan item + self.pre = {} + self.pre['SFB_BEAM_DUMP_AR:ONOFF1']={'Val':0,'InitVal':0} + self.pre['SFB_BEAM_ENERGY_ECOL_AR:ONOFF1']={'Val':0,'InitVal':0} + self.pre['SFB_ORBIT_S30:ONOFF1']={'Val':0,'InitVal':0} + self.pre['SFB_ORBIT_SAR:ONOFF1']={'Val':0,'InitVal':0} + for pv in self.pre.keys(): + self.pre[pv]['adj']=PVAdjustable(pv) + + # adjustable + self.adjSV = 'S30:SET-E-GAIN-OP' + self.adjRB = 'S30:GET-E-GAIN-OP' + self.adj2SV = None + self.adj2RB = None + self.amp = 20 # the amplitude of the scan, which can be scaled + + # acquisition + sensor1 = getBSChannels('SAR[CMU].*DBPM.*:[XY]1$') + sensor2 = getBSChannels('S[23].*-RLLE-DSP:.*-VS$') + self.sensor = sensor1+sensor2 + + # auxiliar data to be read one + self.aux = getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP') + + + def setupAthos(self): + # branch and name tag + self.branch='Athos' + self.name = 'Linac2+3' + + # pre-scan item + self.pre = {} + self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0} + self.pre['SFB_BEAM_ENERGY_ECOL_AT:ONOFF1']={'Val':0,'InitVal':0} + self.pre['SFB_ORBIT_SWY:ONOFF1']={'Val':0,'InitVal':0} + self.pre['SFB_ORBIT_SAT:ONOFF1']={'Val':0,'InitVal':0} + for i in range(1,5): + self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0} + for pv in self.pre.keys(): + self.pre[pv]['adj']=PVAdjustable(pv) + + # adjustable + self.adjSV = 'S20:SET-E-GAIN-OP' + self.adjRB = 'S20:GET-E-GAIN-OP' + self.adj2SV = 'S30:SET-E-GAIN-OP' # counter adjustable + self.adj2RB = 'S30:GET-E-GAIN-OP' + self.amp = 5 # the amplitude of the scan, which can be scaled + + # acquisition + sensor1 = getBSChannels('SAT[SDC].*DBPM.*:[XY]2$') + sensor2 = getBSChannels('S[2].*-RLLE-DSP:.*-VS$') + self.sensor = sensor1+sensor2 + + # auxiliar data to be read one + self.aux = getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP') + + +######################### +# some basic interaction with the scan functionality + + + def preaction(self): + for key in self.pre.keys(): + self.pre[key]['InitVal'] = self.pre[key]['adj'].get_current_value(readback = False) + self.pre[key]['adj'].set_target_value(self.pre[key]['Val']) + + def postaction(self): + for key in self.pre.keys(): + self.pre[key]['adj'].set_target_value(self.pre[key]['InitVal']) + + def stop(self): + if self.sc is None: + return + self.sc.stop() + + def running(self): + if self.sc is None: + return False + return self.sc.running + + def status(self): + if self.sc is None: + return 0,0 + si = self.sc.scan_info.to_dict() + steps = 0 + if 'scan_values' in si: + steps=len(si['scan_values']) + return steps,self.N + + def info(self): + if self.sc is None: + return None + return self.sc.scan_info.to_dict() + + +#-------------------- +# main implementation for debugging + + +if __name__ == '__main__': + daq = Dispersion() + daq.setup(1,10,100) +# threaded execution + scanner=SlicScan() + scanner.start(daq,True) + + + + + + + + diff --git a/app/hero.py b/app/hero.py new file mode 100644 index 0000000..49451b3 --- /dev/null +++ b/app/hero.py @@ -0,0 +1,115 @@ +import datetime +import numpy as np + +from slic.core.acquisition import PVAcquisition +from slic.core.acquisition import BSAcquisition +from slic.core.adjustable import PVAdjustable +from slic.devices.general import motor +from slic.core.scanner import Scanner +from sfbd.ext import CamAcquisition + +# some temporary wrapper +class PollingPVAcquisition(PVAcquisition): + def _acquire(self, *args, polling=True, **kwargs): + return super()._acquire(*args, polling=polling, **kwargs) + +class LaserScanBase: + def __init__(self): + print('Init Base Class') + self.SV= 'SSL-LMOT-M1104:MOT' + self.pol = motor.Motor(self.SV) +# self.pol = PVAdjustable(self.SV) + + def stop(self): + if self.sc is None: + return + self.sc.stop() + + def running(self): + return self.sc.running + + def status(self): + si = self.sc.scan_info.to_dict() + steps = 0 + if 'scan_values' in si: + steps=len(si['scan_values']) + return steps,self.N + + def info(self): + return self.sc.scan_info.to_dict() + + def setup(self,amax=45,Nsteps=5,Nsamples=5): + amin = 0 + self.N = Nsteps + self.Ns= Nsamples + amin = 15 + amax = 22 + self.values=np.linspace(amin,amax,num=self.N) # needs a change + +# measuring the pulse energy as a function of the controling PV. Note that the power should be limited to 300 uJ +# thus limiting the value of the actuaor defining the lase rpulse energy in the EnergyModulaiton class. + +class LaserPower(LaserScanBase): + def __init__(self): + super(LaserPower,self).__init__() + + self.scanname = 'HEROLaserEnergy' + dirpath= datetime.datetime.now().strftime('/sf/data/measurements/%Y/%m/%d/slic_sfbd') + self.scandir='%s/%s' % (dirpath,self.scanname) + + self.RB = 'SSL-LENG-SLNK1:VAL_GET' + self.erg = PVAcquisition("machine","sfbd", default_channels=[self.RB]) + + self.scanner = Scanner(data_base_dir=self.scandir,scan_info_dir=self.scandir,make_scan_sub_dir=True, + default_acquisitions=[self.erg]) + + def scan(self): + self.sc=self.scanner.ascan_list(self.pol,self.values, + filename=self.scanname,start_immediately = False, + n_pulses=self.Ns,return_to_initial_values=True) + self.sc.run() + + + + +# measuring the coherent emission/space charge blow-up as a function of the hero energy modulation + +class EnergyModulation(LaserScanBase): + def __init__(self, acq = 0): + super(EnergyModulation,self).__init__() + self.scanname = 'HEROEnergyModulation' + dirpath= datetime.datetime.now().strftime('/sf/data/measurements/%Y/%m/%d/slic_sfbd') + self.scandir='%s/%s' % (dirpath,self.scanname) + + self.acq = acq + if self.acq == 0: + self.RB ='SATFE10-PEPG046-EVR0:CALCI' + self.erg = BSAcquisition("machine","sfbd", default_channels=[self.RB]) + elif self.acq == 1: + self.RB ='SATBD02-DBPM040:Y2' + self.erg = BSAcquisition("machine","sfbd", default_channels=[self.RB]) + elif self.acq == 2: + self.RB = 'SATBD01-DSCR210' + self.erg = CamAcquisition("machine","sfbd", default_channels=[self.RB]) + self.erg.getConnection(self.RB) + else: + self.RB = 'SATBD02-DSCR050' + self.erg = CamAcquisition("machine","sfbd", default_channels=[self.RB]) + self.erg.getConnection(self.RB) + + self.scanner = Scanner(data_base_dir=self.scandir,scan_info_dir=self.scandir,make_scan_sub_dir=True, + default_acquisitions=[self.erg]) + + def scan(self): + self.sc=self.scanner.ascan_list(self.pol,self.values, + filename=self.scanname,start_immediately = False, + n_pulses=self.Ns,return_to_initial_values=True) + self.sc.run() + + + + + + + + diff --git a/app/runDispersionTools.sh b/app/runDispersionTools.sh new file mode 100755 index 0000000..973f8ad --- /dev/null +++ b/app/runDispersionTools.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +export PYTHONPATH=/sf/bd/packages/slic:/sf/bd/packages/bstrd:$PYTHONPATH +python dispersiontools.py + diff --git a/app/spectralanalysis.py b/app/spectralanalysis.py new file mode 100644 index 0000000..18ce27c --- /dev/null +++ b/app/spectralanalysis.py @@ -0,0 +1,49 @@ +import time +import numpy as np + +from bstrd import BSCache +from epics import PV + +class SpectralAnalysis: + """ + Wrapper class to bundle all daq/io needed for adaptive orbit feedback. + """ + def __init__(self): + + self.bs = BSCache(100000,10000) # 100 second timeout, size for 100 second data taken + self.bs.stop() + + self.channel = '' + self.channels = ['SARFE10-PSSS059:SPECTRUM_Y', + 'SATOP21-PMOS127-2D:SPECTRUM_Y', + 'SATOP31-PMOS132-2D:SPECTRUM_Y'] + self.isConnected = False + + def connect(self,ich): + if ich < 0 or ich >= len(self.channels): + return False + self.channel = self.channels[ich] + print('Connecting to BS-Channel:',self.channel) + self.bs.channels.clear() + self.bs.get_var(self.channel) # this starts the stream into the cache + self.pv = PV(self.channel.replace('_Y','_X')) + + def terminate(self): + print('Stopping BSStream Thread...') + self.bs.stop() + self.bs.pt.running.clear() # for some reason I have to + + def flush(self): + self.bs.flush() + + def read(self): + data=self.bs.__next__() + return data['pid'],data[self.channel] + + def readPV(self): + return self.pv.value + + def getSpectrometerName(self): + return self.channel + + diff --git a/app/xtcavstabilizer.py b/app/xtcavstabilizer.py new file mode 100644 index 0000000..f053977 --- /dev/null +++ b/app/xtcavstabilizer.py @@ -0,0 +1,51 @@ +import time +import numpy as np + +from bstrd import BSCache +from epics import PV + + +class XTCAVStabilizer: + """ + Wrapper class to bundle all daq/io needed for stabilizing the XTCAV + """ + def __init__(self): + + # the PV + self.PVTCAV = PV('SATMA02-RMSM:SM-GET',connection_timeout=0.9, callback=self.stateChanged) + self.state=self.PVTCAV.value == 9 + self.PVVolt = PV('SATMA02-RSYS:SET-ACC-VOLT') + self.PVPhase = PV('SATMA02-RSYS:SET-BEAM-PHASE') + + # the BS channels + self.bs = BSCache(100000,10000) # 100 second timeout, size for 100 second data taken + self.channels = ['SATBD02-DBPM040:X2','SATMA02-RLLE-DSP:PHASE-VS','SATBD02-DBPM040:X2-VALID'] + self.validation = self.channels[2] + self.bs.get_vars(self.channels) # this starts the stream into the cache + self.bs.stop() + + def getPhase(self): + return self.PVPhase.value + + def setPhase(self,phi): + self.PVPhase.set(phi) + + def getVoltage(self): + return self.PVVolt.value + + def stateChanged(self,pvname=None, value=0, **kws): + self.state = value == 9 + + def terminate(self): + print('Stopping BSStream Thread...') + self.bs.stop() + self.bs.pt.running.clear() # for some reason I have to + + def flush(self): + self.bs.flush() + + def read(self): + return self.bs.__next__() + + + diff --git a/ext/__init__.py b/ext/__init__.py new file mode 100644 index 0000000..976402d --- /dev/null +++ b/ext/__init__.py @@ -0,0 +1,4 @@ +from .magnet import Magnet +from .camacquisition import CamAcquisition +from .counteradjustable import CounterAdjustable +from .bscacquisition import BSCAcquisition diff --git a/ext/bscacquisition.py b/ext/bscacquisition.py new file mode 100644 index 0000000..837ac55 --- /dev/null +++ b/ext/bscacquisition.py @@ -0,0 +1,40 @@ + +import h5py +import numpy as np + +from slic.core.acquisition.acquisition import Acquisition + + +# class using the BSQueue to avoid to reestablish a stream for each step. + +class BSCAcquisition(Acquisition): + + def _acquire(self, filename, channels=None, data_base_dir=None, scan_info=None, n_pulses=100, **kwargs): + + queue =channels[0] # abusing interface since BSAcquisition assume a list of channels + + # allocating space + data={} + chns = queue.channels + for chn in chns: + data[chn]={'data':np.zeros((n_pulses))} + data['pulse_id']=np.zeros((n_pulses)) + + # clear the queue + queue.flush() + for i in range(n_pulses): + msg = queue.__next__() # pull data from cache + for chn in chns: + data[chn]['data'][i] = msg[chn] + data['pulse_id'][i]=msg['pid'] + + # write out the data file + hid = h5py.File(filename,'w') + hid.create_dataset('pulse_id', data = data['pulse_id']) + for chn in chns: + gid = hid.create_group(chn) + for key in data[chn].keys(): + gid.create_dataset(key, data = data[chn][key]) + hid.close() + + diff --git a/ext/camacquisition.py b/ext/camacquisition.py new file mode 100644 index 0000000..1f8d487 --- /dev/null +++ b/ext/camacquisition.py @@ -0,0 +1,41 @@ +from time import sleep +from tqdm import trange +import h5py + +from cam_server_client import PipelineClient +from cam_server_client.utils import get_host_port_from_stream_address +from bsread import source, SUB + + +from slic.core.acquisition.acquisition import Acquisition +class CamAcquisition(Acquisition): + + def getConnection(self,cam): + pipeline_client = PipelineClient() + cam_instance_name = str(cam) + "_sp1" + stream_address = pipeline_client.get_instance_stream(cam_instance_name) + self.host, self.port = get_host_port_from_stream_address(stream_address) + print(self.host,self.port) + + def _acquire(self, filename, channels=None, data_base_dir=None, scan_info=None, n_pulses=100, **kwargs): + print("my routine") + print("extra kwargs:", kwargs) + args = (filename, n_pulses, channels) + args = ", ".join(repr(i) for i in args) + print("acquire({})".format(args)) + print(f"dummy acquire to {filename}:") + +# stream_host,stream_port = getPipeLine(channels[0]) +# time.wait(1) + data= [] + with source(host=self.host, port=self.port, mode=SUB) as input_stream: + input_stream.connect() + for i in range(n_pulses): + print('Camera Images', i) + message = input_stream.receive() + data.append(message.data.data) + hid = h5py.File(filename,'w') + gid = hid.create_group(channels[0]) + for key in data[0].keys(): + gid.create_dataset(key, data = [rec[key].value for rec in data]) + hid.close() diff --git a/ext/counteradjustable.py b/ext/counteradjustable.py new file mode 100644 index 0000000..990a733 --- /dev/null +++ b/ext/counteradjustable.py @@ -0,0 +1,23 @@ +from slic.core.adjustable import Adjustable + +class CounterAdjustable(Adjustable): + def __init__(self, adjustable1, adjustable2): + self.adj1=adjustable1 + self.adj2=adjustable2 + self.ref_values() # implementation needs reference values to convert absolute scan to relative scan + + def ref_value(self): + self.val1 = self.adj1.get_current_value(readback = False) + self.val2 = self.adj2.get_current_value(readback = False) + + def set_target_value(self, value): + t1 = self.adj1.set_target_value(self.val1 + value) + t2 = self.adj2.set_target_value(self.val2 - value) + t1.wait() + t2.wait() + + def get_current_value(self): + return self.adj1.get_current_value() + + def is_moving(self): + return any([self.adj1.is_moving(),self.adj2.is_moving()]) diff --git a/ext/magnet.py b/ext/magnet.py new file mode 100644 index 0000000..2243b39 --- /dev/null +++ b/ext/magnet.py @@ -0,0 +1,21 @@ +from slic.core.adjustable import PVAdjustable +from slic.utils import typename + +class Magnet(PVAdjustable): + + def __init__(self,name): + self.name=name + pvsv='%s:I-SET' % name + pvrb='%s:I-READ' % name + tol = 0.075 + super().__init__(pvsv,pvname_readback=pvrb,accuracy=tol,internal=True) + + + + @property + def status(self): + return "Cycling" + + def __repr__(self): + tn = typename(self) + return f"{tn} \"{self.name}\" is {self.status}" diff --git a/interface/__init__.py b/interface/__init__.py new file mode 100644 index 0000000..8f3efde --- /dev/null +++ b/interface/__init__.py @@ -0,0 +1,6 @@ +from .snap import getSnap +from .snap import saveSnap +from .save import saveDataset +from .load import loadDataset +from .elog import writeElog +from .slic import SlicScan diff --git a/interface/elog.py b/interface/elog.py new file mode 100644 index 0000000..7dbd84c --- /dev/null +++ b/interface/elog.py @@ -0,0 +1,25 @@ +import os +import elog + +def writeElog(text, Title = 'Test', Application = 'SFBD-Module', Attachment = None): + """ + Generates an entry in the electronic logbook of SwissFEL Commisisoning Data + :param text: The text to be placed in the log book + :param Title: Title of the log book entry + :param Application: Name of application which generates the log book entry + :param Attachment: List of attachments to be added to the log book (mostly plots) + :return: Message ID of log book entry + """ + + # supplemental info + Author = os.getlogin() + Category = 'Measurement' # Info or Measurement + System = 'Beamdynamics' # Beamdynamics, Operation, Controls + + dict_att = {'Author': Author, 'Application': Application, 'Category': Category, 'Title': Title, 'System': System} + + logbook = elog.open('https://elog-gfa.psi.ch/SwissFEL+commissioning+data/', user='robot', password='robot') + return logbook.post(text, attributes=dict_att, attachments=Attachment) + + + diff --git a/interface/load.py b/interface/load.py new file mode 100644 index 0000000..748aecb --- /dev/null +++ b/interface/load.py @@ -0,0 +1,58 @@ +import h5py + + +def loadDataset(filename): + hid = h5py.File(filename, "r") + snap = loadSnap(hid) + data = loadData(hid) + act = loadActuator(hid) + hid.close() + return data,act,snap + +def loadSnap(hid): + snap={} + if not 'experiment' in hid.keys(): + return None + for key1 in hid['experiment'].keys(): + if isinstance(hid['experiment'][key1],h5py.Group): + for key2 in hid['experiment'][key1].keys(): + val = hid['experiment'][key1][key2][()] + snap[key1+':'+key2]=val + else: + snap[key1]=hid['experiment'][key1][()] + return snap + +def loadData(hid,scanrun=1): + run='scan_%d' % scanrun + data = {} + for key1 in hid[run]['data'].keys(): + if isinstance(hid[run]['data'][key1],h5py.Group): + for key2 in hid[run]['data'][key1].keys(): + val = hid[run]['data'][key1][key2][()] + data[key1+':'+key2]=val + else: + data[key1]=hid[run]['data'][key1][()] + return data + +def loadActuator(hid,scanrun=1): + run='scan_%d' % scanrun + data = {} + if 'actuators' in hid[run]['method'].keys(): + for key1 in hid[run]['method']['actuators'].keys(): + if isinstance(hid[run]['method']['actuators'][key1],h5py.Group): + for key2 in hid[run]['method']['actuators'][key1].keys(): + val = hid[run]['method']['actuators'][key1][key2][()] + data[key1+':'+key2]={'val':val} + else: + data[key1]=hid[run]['method']['actuators'][key1][()] + return data + + + + + + + + + + diff --git a/interface/save.py b/interface/save.py new file mode 100644 index 0000000..934c036 --- /dev/null +++ b/interface/save.py @@ -0,0 +1,189 @@ +import sys +import os +import datetime +import h5py +import socket +from PIL import Image + +def getDatasetFileName(program='Unknown'): + year = datetime.datetime.now().strftime('%Y') + month = datetime.datetime.now().strftime('%m') + day = datetime.datetime.now().strftime('%d') + + path = '/sf/data/measurements/%s' % year + if not os.path.exists(path): + os.makedirs(path) + path = '%s/%s' % (path,month) + if not os.path.exists(path): + os.makedirs(path) + path = '%s/%s' % (path,day) + if not os.path.exists(path): + os.makedirs(path) + datetag = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') + filename=('%s/%s_%s' % (path, program.replace(' ','_'), datetag)) + return filename + + +def saveDataset(program,data,actuator=None,snap=None,analysis=None,figures=None): + hid,filename = openDataset(program) + if not hid: + return None + # check if scan is multiple instances of a scan + if isinstance(data,list): + for iscan,singledata in enumerate(data): + writeData(hid,singledata,iscan) + else: + writeData(hid,data,1) + # same for actuator + if isinstance(actuator,list): + for iscan,singleactuator in enumerate(actuator): + writeActuator(hid,singleactuator,iscan) + else: + writeActuator(hid,actuator,1) + # and same for analysis + if isinstance(analysis,list): + for iscan,singleana in enumerate(analysis): + writeAnalysis(hid,singleana,iscan) + else: + writeAnalysis(hid,analysis,1) + # write aux data + writeSnap(hid,snap) + hid.close() + + if figures: + writeFigure(filename,figures) + return filename + + +def openDataset(program): + if isinstance(program,str): + program={'Name':program,'Author':'Unknown','Version':'Unknown'} + if not isinstance(program,dict): + return None,None + if not 'Author' in program.keys(): + program['Author']='Unknown' + if not 'Version' in program.keys(): + program['Version']='Unknown' + filename=getDatasetFileName(program['Name']) + hid= h5py.File(filename+'.h5', "w") + + # meta data header + dt = h5py.special_dtype(vlen=bytes) + dset=hid.create_dataset('general/user',(1,),dtype=dt) + dset[0]=os.getlogin() + dset=hid.create_dataset('general/application',(1,),dtype=dt) + dset[0]=program['Name'] + dset=hid.create_dataset('general/author',(1,),dtype=dt) + dset[0]=program['Author'] + dset=hid.create_dataset('general/version',(1,),dtype=dt) + dset[0]=program['Version'] + dset=hid.create_dataset('general/created',(1,),dtype=dt) + dset[0]=str(datetime.datetime.now()) + return hid,filename + + + +def writeData(hid, data, scanrun=1): + # write the sensor raw value + for ele in data.keys(): + name=ele.split(':') + if len(name)>1: + dset=hid.create_dataset('scan_%d/data/%s/%s' % (scanrun, name[0], name[1]), data=data[ele]) + else: + dset=hid.create_dataset('scan_%d/data/%s' % (scanrun, name[0]), data=data[ele]) + dset.attrs['system'] = getDatasetSystem(name[0]) + dset.attrs['units'] = 'unknown' + # this part is obsolete - dimension should be given from the individual datasets + if not 'pid' in data.keys(): + return + shape = data['pid'].shape + ndim = len(shape) + nsam = shape[-1] + nrec = 0 + if ndim > 1: + nrec = shape[:-1][0] + hid.create_dataset("scan_%d/method/records" % scanrun,data=[nrec]) + hid.create_dataset("scan_%d/method/samples" % scanrun,data=[nsam]) + hid.create_dataset("scan_%d/method/dimension" % scanrun,data=[ndim]) + hid.create_dataset("scan_%d/method/reducedData" % scanrun,data=[0]) # indicating that there is at least a 2D array for scalar data + + +def writeActuator(hid,act,scanrun=1): + if not act: + return + dt = h5py.special_dtype(vlen=bytes) + dset=hid.create_dataset("scan_%d/method/type" % scanrun,(1,),dtype=dt) + nact = len(act.keys()) + if nact>0: + dset[0]='Scan' + else: + dset[0]='Time Recording' + for ele in act.keys(): + name=ele.split(':') + if len(name)>1: + dset=hid.create_dataset("scan_%d/method/actuators/%s/%s" % (scanrun,name[0],name[1]),data=act[ele]) + else: + dset=hid.create_dataset("scan_%d/method/actuators/%s" % (scanrun,name[0]),data=act[ele]) + dset.attrs['system']=getDatasetSystem(name[0]) + dset.attrs['units']='unknown' + +def writeSnap(hid,val): + if not val: + return + for key in val.keys(): + name=key.split(':') + if len(name)>1: + dset=hid.create_dataset('experiment/%s/%s' % (name[0],name[1]),data=val[key]) + else: + dset=hid.create_dataset('experiment/%s/%s' % (name[0]),data=val[key]) + dset.attrs['system']=getDatasetSystem(name[0]) + dset.attrs['units']='unknown' + + +def writeAnalysis(hid,data,scanrun=1): + if not data: + return + for key in data.keys(): + name=key.split(':') + if len(name)>1: + dset=hid.create_dataset('scan_%d/analysis/%s/%s' % (scanrun, name[0], name[1]), data=data[key]) + else: + dset=hid.create_dataset('scan_%d/analysis/%s/%s' % (scanrun, name[0]), data=data[key]) + dset.attrs['system']='analysis' + dset.attrs['units']='unknown' + +def writeFigure(filename,figs): + for i,ele in enumerate(figs): + if ele == None: + continue + plotname='%s_Fig%d.png' % (filename,(i+1)) + im = Image.open(ele) + im.save(plotname) + return None + +def getDatasetSystem(name): + if len(name) > 11: + tag=name[8:9] + fulltag=name[8:12] + else: + tag='' + fulltag='' + sys='Unknown' + if tag =='P': + sys='Photonics' + if tag =='D': + sys='Diagnostics' + if fulltag =='DSCR': + sys='Camera' + if tag == 'R': + sys='RF' + if tag == 'M': + sys='Magnets' + if tag == 'U': + sys='Undulator' + return sys + + + + + diff --git a/interface/slic.py b/interface/slic.py new file mode 100644 index 0000000..a37b814 --- /dev/null +++ b/interface/slic.py @@ -0,0 +1,98 @@ +import h5py +import numpy as np +import time +from threading import Thread +from PyQt5.QtCore import QObject, pyqtSignal + +# to do: +# 1 - check if scan thread is running +# 2 - import of BSread data + +from sfbd.interface import getSnap + +class SlicScan(QObject): + + siginc = pyqtSignal(int, int) # signal for increment + sigterm = pyqtSignal(int) # signal for termination + sigsnap = pyqtSignal(bool) + + def __init__(self): + QObject.__init__(self) + self.clear() + + def clear(self): + self.daq = None + self.data = None + self.act = None + self.snap = None + self.doSnap = False + + def start(self,daq,snap=False): + self.clear() + self.doSnap = snap + self.daq=daq + Thread(name='Scan-Monitor',target=self.Tmonitor).start() + + def Tmonitor(self): + mythread = Thread(name='Slic-Scanner',target=self.Tscanner).start() + time.sleep(1) + ostep = -1 + while(self.daq.running()): + istep,nstep=self.daq.status() + if istep>ostep: + ostep=istep + self.siginc.emit(istep,nstep) + time.sleep(1) + if not mythread == None: # wait till scanning thread is done + mythread.join() + istep,nstep=self.daq.status() + self.siginc.emit(istep,nstep) + self.data,self.act = importSlicScan(self.daq.info()) + if hasattr(self.daq,'auxdata'): + self.data.update(self.daq.auxdata) + self.sigterm.emit(istep==nstep) + # if requested add snapshot acquisition + if self.doSnap: + self.startSnap() + + def Tscanner(self): + self.daq.scan() + + def startSnap(self): + Thread(name='Snap-Acquisition',target=self.Tsnap).start() + + def Tsnap(self): + self.snap = getSnap() + print('Acquired snap') + self.sigsnap.emit(True) + + def stop(self): + self.daq.stop() + + +def importSlicScan(scan_info): + if not isinstance(scan_info,dict): + return None,None + if not 'scan_files' in scan_info.keys(): + return None,None + sfiles = scan_info['scan_files'] + data = {} + for istep, sfile in enumerate(sfiles): + hid = h5py.File(sfile[0],'r') + for name, h5obj in hid.items(): + if isinstance(h5obj,h5py.Dataset): # pv channels + data[name] = addDatasetToData(data,name,h5obj) + elif isinstance(h5obj,h5py.Group): # bs read channels + if 'data' in h5obj: + data[name] = addDatasetToData(data,name,h5obj['data']) + actuator = {} + name = scan_info['scan_parameters']['name'][0] + actuator[name]=np.array(scan_info['scan_values']) + data[name]=np.array(scan_info['scan_readbacks']) + return data,actuator + +def addDatasetToData(data,name,h5obj): + if not name in data: + return np.array([h5obj[()]]) + else: + return np.append(data[name],np.array([h5obj[()]]),axis=0) diff --git a/interface/snap.py b/interface/snap.py new file mode 100644 index 0000000..714504b --- /dev/null +++ b/interface/snap.py @@ -0,0 +1,82 @@ +import numpy as np +import yaml +import os +import datetime +import epics + +# things to do: +# 1. Read a snapshot file (not request file) +# 2. add parameters and performance channels (e.g. AT photon energies) + +def parseSnapShotReqYAML(filename): +# read the snapshot request file +# returns a list of PV names + if not filename: + filename = '/sf/data/applications/snapshot/req/op/SF_settings.yaml' + pvs = [] + path = os.path.dirname(filename) + with open(filename) as f: + try: + content = yaml.load(f, Loader=yaml.SafeLoader) + if 'include' in content.keys(): + if len(content['include']) > 0: + for cont in content['include']: + retpv = parseSnapShotReqYAML(path+'/'+cont['name']) + if 'macros' in cont.keys(): + retpv = applyReqMacro(retpv,cont['macros']) + pvs = pvs + retpv + if 'pvs' in content.keys(): + if 'list' in content['pvs']: + for pv in content['pvs']['list']: + pvs.append(pv['name']) + return pvs + return None + except yaml.YAMLError as e: + print(e) + return None + return None + +def applyReqMacro(pvs_in,macros): + pvs = [] + for macro in macros: + for key in macro: + tag='$('+key+')' + for pv in pvs_in: + if tag in pv: + pvs.append(pv.replace(tag,macro[key])) + for pv in pvs_in: # copy the ones without macro + if not '$(' in pv: + pvs.append(pv) + return pvs + +def getSnap(pvs=None): + if not isinstance(pvs,list): + pvs = parseSnapShotReqYAML(pvs) + if not pvs: + return + ret={} + val = epics.caget_many(pvs) + for i,pv in enumerate(pvs): + if val[i]: # filter out None values + ret[pv]=float(val[i]) +# epics.ca.clear_cache() + return ret + +def saveSnap(pvs={},label="", comment = "generated by application"): + filename = datetime.datetime.now().strftime('/sf/data/applications/snapshot/SF_settings_%Y%m%d_%H%M%S.snap') + with open(filename,'w') as fid: + fid.write('#{"labels":["%s"],"comment":"%s", "machine_parms":{}, "save_time": 0.0, "req_file_name": "SF_settings.yaml"}\n' % (label,comment)) + for key in pvs.keys(): + if isinstance(pvs[key],int): + fid.write('%s,{"val": %d}\n' % (key,pvs[key])) + elif isinstance(pvs[key],float): + fid.write('%s,{"val": %f}\n' % (key,pvs[key])) + elif isinstance(pvs[key],str): + fid.write('%s,{"val": %s}\n' % (key,pvs[key])) + + + + + + + diff --git a/template/ServerTemplate.py b/template/ServerTemplate.py new file mode 100644 index 0000000..69bc95c --- /dev/null +++ b/template/ServerTemplate.py @@ -0,0 +1,33 @@ + +import signal + +import ServerBase + +class ServerTemplate(ServerBase.ServerBase): + def __init__(self, PVroot = 'MyServer', debug = False): + self.version='1.0.0' + self.program ='Server Template' + super(ServerTemplate, self).__init__(PVroot,debug,'127.0.0.1', 5678) # last too numbers are the IP adress and port of watchdog + + # connect to the individual handler, which must have the function terminate() implemented + # each handler should be their own thread to not interfere with the main process-loop + self.handler={} + + def terminateSubThreads(self): + for subserver in self.handler.keys(): + self.handler[subserver].terminate() + +if __name__ == '__main__': + + debug = True + server = ServerTemplate('SF-BC-SERVER', debug) + signal.signal(signal.SIGTERM,server.terminate) + try: + server.run() + except KeyboardInterrupt: + server.terminate(None,None) + + + + + diff --git a/util/__init__.py b/util/__init__.py new file mode 100644 index 0000000..7d06df8 --- /dev/null +++ b/util/__init__.py @@ -0,0 +1,4 @@ +from .zmqbase import ZMQBase +from .serverbase import ServerBase + + diff --git a/util/serverbase.py b/util/serverbase.py new file mode 100644 index 0000000..80bc3e4 --- /dev/null +++ b/util/serverbase.py @@ -0,0 +1,92 @@ +import sys +import signal +import os +import socket +import logging +import logging.handlers +from logging.handlers import RotatingFileHandler +from datetime import datetime +import time + +from epics import PV +sys.path.append('/sf/bd/packages/sfbd') +from sfbd.util import ZMQBase + +class ServerBase(ZMQBase): + def __init__(self, root = 'MyServer', debug = False, WDServer = '127.0.0.1', WDPort = 5678): + + super(ServerBase,self).__init__(WDServer,WDPort) + + self.debug = debug + self.root = root + self.suffix='' + if self.debug: + self.suffix='-SIMU' + self.host = socket.gethostname() + self.pid = os.getpid() # process ID + + # enabling logging + self.logfilename="/sf/data/applications/BD-SERVER/%s.log" % self.root + handler = RotatingFileHandler(filename=self.logfilename, + mode='a', + maxBytes=5 * 1024 * 1024, + backupCount=1, + delay=0) + if self.debug: + logging.basicConfig(level=logging.INFO, + format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s', + datefmt='%m-%d %H:%M:%S') + else: + logging.basicConfig(level=logging.INFO, + format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s', + datefmt='%m-%d %H:%M:%S', + handlers=[handler,]) + self.logger=logging.getLogger(self.program) + + # setting up ZMQ interface + self.ZMQServerInfo(self.root,self.host,self.pid) + + # individual channels of main thread + self.PVstop = PV('%s:STOP%s' % (self.root,self.suffix)) + self.PVstop.value = 0 + self.PVstop.add_callback(self.stop) + self.PVping = PV('%s:PING%s' % (self.root,self.suffix)) + self.PVlog = PV('%s:LOG%s' % (self.root,self.suffix)) + + def stop(self,pvname=None,value=None,**kws): + if value > 0: + self.logger.info('PV:STOP triggered at %s' % datetime.now().strftime('%Y-%m-%d %H:%M:%S')) + self.running=False + + def start(self): + self.logger.info('Starting Server: %s at %s' % (self.root,datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + self.logger.info('PV Root: %s' % self.root) + self.logger.info('Version: %s' % self.version) + self.logger.info('Host: %s' % self.host) + self.logger.info('PID: %d' % self.pid) + if self.debug: + self.logger.info('Debug Mode') + + def run(self): + self.start() + self.running=True + while self.running: + time.sleep(1) + if self.ZMQPoll(): + self.logger.info('Watchdog Server requested termination') + self.running = False + self.PVping.value = datetime.now().strftime('Last active at %Y-%m-%d %H:%M:%S') + self.terminate(None,None) + + def terminate(self,signum,frame): + # stopping any sub thread with the server specific function terminateSubThreads + self.terminateSubThreads() + self.logger.info('Terminating Server at %s' % datetime.now().strftime('%Y-%m-%d %H:%M:%S')) + self.ZMQPoll('quit') # informing the watchdog + print('Bunch Compressor Server is quitting...') + sys.exit(0) + + + + + diff --git a/util/zmqbase.py b/util/zmqbase.py new file mode 100644 index 0000000..1e883d3 --- /dev/null +++ b/util/zmqbase.py @@ -0,0 +1,66 @@ +import zmq +import socket +import sys + +class ZMQBase: + def __init__(self, host='127.0.0.1',port = 5678): + + self.host=host + self.port = port + self.msg={'action':'','PV':'','host':'','pid':0} + + self.REQUEST_TIMEOUT = 500 + self.REQUEST_RETRIES = 2 + self.SERVER_ENDPOINT = "tcp://%s:%d" % (host,port) + self.serverIsOffline=False # assume that it is online + + def ZMQServerInfo(self,PVroot,host,pid): + self.msg['PV']=PVroot + self.msg['host']=host + self.msg['pid']=pid + + + def ZMQPoll(self,tag='ping'): + self.msg['action']=tag + context = zmq.Context() + client = context.socket(zmq.REQ) + client.connect(self.SERVER_ENDPOINT) + client.send_pyobj(self.msg) + + retries_left = self.REQUEST_RETRIES + while True: + if (client.poll(self.REQUEST_TIMEOUT) & zmq.POLLIN) != 0: + reply = client.recv_pyobj() + check = self.ZMQIdentifyReply(reply) + if self.serverIsOffline: + self.logger.info("Watchdog server came online") + self.serverIsOffline=False + if check: + return (reply['action'] == 'quit') + else: + self.logger.warning("Malformed reply from server") + continue + + retries_left -= 1 + # Socket is confused. Close and remove it. + client.setsockopt(zmq.LINGER, 0) + client.close() + if retries_left == 0: + if not self.serverIsOffline: + self.logger.info("Watchdog server seems to be offline") + self.serverIsOffline=True + return False + + # Create new connection + client = context.socket(zmq.REQ) + client.connect(self.SERVER_ENDPOINT) + client.send_pyobj(self.msg) + + + def ZMQIdentifyReply(self,reply): + for field in ['PV','host','pid']: + if not reply[field] == self.msg[field]: + return False + return True + +