6 Commits

11 changed files with 436 additions and 99 deletions

View File

@ -27,11 +27,11 @@ class AdaptiveOrbit:
idx = '070'
if bpm == 5 or bpm ==14:
idx='410'
self.ATchx.append('SATUN%2.2d-DBPM%s:X1' % (bpm,idx))
self.ATchy.append('SATUN%2.2d-DBPM%s:Y1' % (bpm,idx))
self.ATchx.append('SATUN%2.2d-DBPM%s:X2' % (bpm,idx))
self.ATchy.append('SATUN%2.2d-DBPM%s:Y2' % (bpm,idx))
self.bsAT = self.initBSStream([self.ATch0]+self.ATchx+self.ATchy)
self.pvAT = self.initPV(self.ATchx)
self.kickerAR = self.initPV(['SATMA01-MCRX610:I-SET','SATMA01-MCRY610:I-SET','SATUN05-MCRX420:I-SET','SATUN05-MCRY420:I-SET','SFB_ORBIT_SAT:ONOFF1'])
self.kickerAT = self.initPV(['SATMA01-MCRX610:I-SET','SATMA01-MCRY610:I-SET','SATUN05-MCRX420:I-SET','SATUN05-MCRY420:I-SET','SFB_ORBIT_SAT:ONOFF1'])
# select first beamline
self.isAramis = True
@ -39,14 +39,7 @@ class AdaptiveOrbit:
def initBSStream(self,channels):
print("Initializing BSstream")
bs = BSCache(100000,10000) # 1 second time out, capazity for 100 second.
# bs.stop()
# for cnl in channels[1:]:
# if not is_available(cnl):
# raise ValueError(f"BS-Channel {cbl} is not available")
# res = make_channel_config(cnl,None,None)
# bs.channels[res]=res
# bs.get_var(channels[0]) # this starts also the stream into the cache
bs = BSCache(100000,100000) # 1000 second time out, capazity for 1000 second.
bs.get_vars(channels)
return bs
@ -54,8 +47,14 @@ class AdaptiveOrbit:
print("Initializing EPICS Channels")
pvs = []
for x in chx:
pvs.append(PV(x.replace(':X1',':X-REF-FB')))
pvs.append(PV(x.replace(':X1',':Y-REF-FB')))
if ':X1' in x:
pvs.append(PV(x.replace(':X1',':X-REF-FB')))
pvs.append(PV(x.replace(':X1',':Y-REF-FB')))
elif ':X2' in x:
pvs.append(PV(x.replace(':X2',':X-REF-FB')))
pvs.append(PV(x.replace(':X2',':Y-REF-FB')))
else:
pvs.append(PV(x))
con = [pv.wait_for_connection(timeout=0.2) for pv in pvs]
for i, val in enumerate(con):
if val is False:
@ -78,7 +77,7 @@ class AdaptiveOrbit:
if beamline == 'Aramis':
self.isAramis = True
else:
self.isAthos = True
self.isAramis = False
# all routine for accessing the machine (read/write)
@ -100,9 +99,9 @@ class AdaptiveOrbit:
if self.isAramis:
for i in range(len(fbval)):
self.pvAR[i].value = fbval[i]
return
for i in range(len(fbval)):
self.pvAT[i].value = fbval[i]
else:
for i in range(len(fbval)):
self.pvAT[i].value = fbval[i]
def getPVNames(self):
if self.isAramis:
@ -120,7 +119,6 @@ class AdaptiveOrbit:
return [pv.value for pv in self.kickerAT]
def setKicker(self,vals):
return
if self.isAramis:
for i,val in enumerate(vals):
self.kickerAR[i].value = val

View File

@ -10,6 +10,8 @@ from slic.core.acquisition import BSAcquisition, PVAcquisition
from slic.core.scanner import Scanner
from sfbd.ext import CounterAdjustable
from sfbd.ext import BSCAcquisition
from sfbd.interface import SlicScan
from bstrd import BSCache
def getAux(pvs=None):
if not pvs:
@ -19,7 +21,6 @@ def getAux(pvs=None):
for i,pv in enumerate(pvs):
if val[i]: # filter out None values
ret[pv]=float(val[i])
epics.ca.clear_cache()
return ret
@ -30,23 +31,33 @@ def getBSChannels(regexp):
if prog.match(bs['name']):
res.append(bs['name'])
return res
def getRFCalibrationChannels(sensors,energy):
aux=[]
for sen in sensors:
if 'PHASE-VS' in sen:
aux.append(sen.replace('PHASE-VS','GET-VSUM-PHASE-OFFSET').replace('RLLE-DSP','RSYS'))
aux.append(sen.replace('PHASE-VS','GET-VSUM-AMPLT-SCALE').replace('RLLE-DSP','RSYS'))
aux.append(sen.replace('PHASE-VS','SM-GET').replace('RLLE-DSP','RMSM'))
aux.append(energy)
return aux
class Dispersion:
def __init__(self, branch = 'Aramis'):
self.scanname = 'Dispersion'
self.branch = 'None'
self.branch = None
dirpath= datetime.datetime.now().strftime('measurements/%Y/%m/%d/slic_sfbd')
self.basedir = '/sf/data/%s/%s' % (dirpath,self.scanname)
self.pgroup='%s/%s' % (dirpath,self.scanname)
self.scandir='/sf/data/'+self.pgroup
self.setBranch(branch)
self.sc = None
self.setBranch(branch) # enfore Athos dump settings for now
self.Nsteps = 2
self.Nsamples = 1
self.bsc = None
def setBranch(self,branch = 'Aramis'):
self.sc = None
if branch == 'Athos Dump':
@ -55,30 +66,112 @@ class Dispersion:
self.setupAramis()
elif branch == 'Athos':
self.setupAthos()
elif branch =='Bunch Compressor 2':
self.setupBC2()
else:
self.scanner = None
self.branch = None
return self.branch
def setup(self,scl = 1, Nsteps=5, Nsamples=10):
# the setup is done from the main thread. Therefore I have to define the BSC here
self.N = Nsteps
self.Ns= Nsamples
self.scale = scl
# define stream
print('Getting BSCache')
self.bsc = BSCache(100000,10000) # 100 second timeout, size for 100 second data taken
self.bsc.get_vars(self.sensor) # this starts the stream into the cache
print('Getting BSCache done')
def scan(self):
# core routine to do all action. Will be a thread of the slic scanner wrapper
if not self.branch:
return
# path is define in the various set-up procedures
self.scanner = Scanner(data_base_dir=self.basedir,scan_info_dir=self.basedir,
make_scan_sub_dir=True,
default_acquisitions=self.acq)
def getRFCalibrationChannels(self,sensor2,energy):
aux=[]
for sen in sensor2:
if 'PHASE-VS' in sen:
aux.append(sen.replace('PHASE-VS','GET-VSUM-PHASE-OFFSET').replace('RLLE-DSP','RSYS'))
aux.append(sen.replace('PHASE-VS','GET-VSUM-AMPLT-SCALE').replace('RLLE-DSP','RSYS'))
aux.append(sen.replace('PHASE-VS','SM-GET').replace('RLLE-DSP','RMSM'))
aux.append(energy)
return aux
# define acquisition channels wrapper for BSQueue
pgroup = '%s-%s' % (self.pgroup,self.branch)
acq = [BSCAcquisition(".",pgroup, default_channels=[self.bsc])]
# define the scanner
self.scanner = Scanner(data_base_dir=self.basedir, # root directory of data location e.g. /sf/data/2023/02/02/Dispersion
scan_info_dir=self.basedir, # write also scan info there
make_scan_sub_dir=True, # put each scan in its own directory
default_acquisitions=acq) # adjustable to use
# define adjustable
self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1,ID = self.adjSV, name = self.name)
val = self.adj.get_current_value(readback=False)
if self.adj2SV:
self.adj2 = PVAdjustable(self.adj2SV,pvname_readback = self.adj2RB, accuracy = 0.1,ID = self.adjSV, name = self.name)
val2 = self.adj2.get_current_value(readback=False)
dval = self.amp*self.scale
print(self.adjSV,' - Scan Range:',val-dval,'to',val+dval)
if self.adj2SV:
print(self.adj2SV,' - Scan Range:',val2+dval,'to',val2-dval)
# create scanner backend
if self.adj2SV:
self.sc=self.scanner.a2scan(self.adj, val-dval, val+dval,
self.adj2, val2+dval, val2-dval,
n_intervals = self.N-1, # steps
n_pulses = self.Ns, # samples
filename=self.branch, # relative directory for data
start_immediately = False, # wait for execution to performe pre-action items
return_to_initial_values=True) # return to initial values
else:
self.sc=self.scanner.ascan(self.adj, val-dval, val+dval,
n_intervals = self.N-1, # steps
n_pulses = self.Ns, # samples
filename=self.branch, # relative directory for data
start_immediately = False, # wait for execution to performe pre-action items
return_to_initial_values=True) # return to initial values
# get aux data first
self.auxdata = getAux(self.aux)
self.preaction()
self.sc.run()
self.postaction()
self.bsc.stop() # stop the queue
#################################
# definition of the individual branches
def setupBC2(self):
# branch and name tag
self.branch='Bunch Compressor 2'
self.name ='BC2'
# pre-scan item - needs an adjustment of the dipole current of about 2.3e-3 - to be implemented later.
self.pre = {}
self.pre['SFB_COMPRESSION_BC2_AR:ONOFF1']={'Val':0,'InitVal':0}
self.pre['SFB_COMPRESSION_BC2_AR:ONOFF2']={'Val':0,'InitVal':0}
self.pre['SFB_ORBIT_S10:ONOFF1']={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'S10:SET-E-GAIN-OP'
self.adjRB = 'S10:GET-E-GAIN-OP'
self.adj2SV = 'S20:SET-E-GAIN-OP' # counter adjustable
self.adj2RB = 'S20:GET-E-GAIN-OP'
self.amp = 5 # the amplitude of the scan, which can be scaled
# acquisition
sensor1 = getBSChannels('S10[BM].*-DBPM.*:[XY]1$')
sensor2 = getBSChannels('S1.*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
# auxiliar data to be read one
self.aux = getRFCalibrationChannels(sensor2,'SINBC02-MBND100:ENERGY-OP')
def setupAthosDump(self):
# branch and name tag
self.branch='Athos_Dump'
pgroup = '%s-%s' % (self.pgroup,self.branch)
self.path = '/sf/data/'+pgroup
self.name ='SATCB01-Linac'
# pre-scan item - needs an adjustment of the dipole current of about 2.3e-3 - to be implemented later.
self.pre = {}
self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0}
@ -87,23 +180,27 @@ class Dispersion:
self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'SATCB01-RSYS:SET-BEAM-PHASE'
self.adjRB = 'SATCB01-RSYS:GET-BEAM-PHASE'
self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1,ID = self.adjSV, name = "SATCB01-Linac")
self.adj2SV = None
self.adj2RB = None
self.amp = 30 # the amplitude of the scan, which can be scaled
# acquisition
sensor1 = getBSChannels('SATBD02-DBPM.*:Y2$')
sensor2 = getBSChannels('SATCB.*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
self.acq = [BSAcquisition(".",pgroup, default_channels=self.sensor)]
# auxiliar data to be read one
self.aux = self.getRFCalibrationChannels(sensor2,'SATCL01-MBND100:ENERGY-OP')
self.aux = getRFCalibrationChannels(sensor2,'SATCL01-MBND100:ENERGY-OP')
def setupAramis(self):
# branch and name tag
self.branch='Aramis'
pgroup = '%s-%s' % (self.pgroup,self.branch)
self.path = '/sf/data/'+pgroup
self.name = 'Linac3'
# pre-scan item
self.pre = {}
self.pre['SFB_BEAM_DUMP_AR:ONOFF1']={'Val':0,'InitVal':0}
@ -112,23 +209,28 @@ class Dispersion:
self.pre['SFB_ORBIT_SAR:ONOFF1']={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'S30:SET-E-GAIN-OP'
self.adjRB = 'S30:GET-E-GAIN-OP'
self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1,ID = self.adjSV, name = "Linac3")
self.adj2SV = None
self.adj2RB = None
self.amp = 20 # the amplitude of the scan, which can be scaled
# acquisition
sensor1 = getBSChannels('SAR.*DBPM.*:[XY]1$')
sensor1 = getBSChannels('SAR[CMU].*DBPM.*:[XY]1$')
sensor2 = getBSChannels('S[23].*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
self.acq = [BSAcquisition(".",pgroup, default_channels=self.sensor)]
# auxiliar data to be read one
self.aux = self.getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP')
self.aux = getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP')
def setupAthos(self):
self.branch='Aramis'
pgroup = '%s-%s' % (self.pgroup,self.branch)
self.path = '/sf/data/'+pgroup
# branch and name tag
self.branch='Athos'
self.name = 'Linac2+3'
# pre-scan item
self.pre = {}
self.pre['SFB_BEAM_DUMP_AT:ONOFF1']={'Val':0,'InitVal':0}
@ -139,31 +241,25 @@ class Dispersion:
self.pre['SFB_ORBIT_SAT_%2.2d:ONOFF1' % i ]={'Val':0,'InitVal':0}
for pv in self.pre.keys():
self.pre[pv]['adj']=PVAdjustable(pv)
# adjustable
self.adjSV = 'S20:SET-E-GAIN-OP'
self.adjRB = 'S20:GET-E-GAIN-OP'
self.adj = PVAdjustable(self.adjSV,pvname_readback = self.adjRB, accuracy = 0.1,ID = self.adjSV, name = "Linac 2 and 3")
# self.adj2SV = 'S30:SET-E-GAIN-OP'
# self.adj2RB = 'S30:GET-E-GAIN-OP'
# self.adj2 = PVAdjustable(self.adj2SV,pvname_readback = self.adj2RB, accuracy = 0.1)
# self.adj = CounterAdjustable(self.adj1,self.adj2) # combine the two channels
self.amp = 10 # the amplitude of the scan, which can be scaled
self.adj2SV = 'S30:SET-E-GAIN-OP' # counter adjustable
self.adj2RB = 'S30:GET-E-GAIN-OP'
self.amp = 5 # the amplitude of the scan, which can be scaled
# acquisition
sensor1 = getBSChannels('SAT[SDC].*DBPM.*:[XY]2$')
sensor2 = getBSChannels('S[2].*-RLLE-DSP:.*-VS$')
self.sensor = sensor1+sensor2
self.acq = [BSAcquisition(".",pgroup, default_channels=self.sensor)]
# auxiliar data to be read one
self.aux = self.getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP')
self.aux = getRFCalibrationChannels(sensor2,'S10BC02-MBND100:ENERGY-OP')
def setup(self,scl = 1, Nsteps=5, Nsamples=5):
val = self.adj.get_current_value(readback=False)
# dval = self.amp*scl
dval = 0 ######## edit this
self.N = Nsteps
self.Ns= Nsamples
self.values=np.linspace(val-dval,val+dval,num=self.N)
#########################
# some basic interaction with the scan functionality
def preaction(self):
@ -174,40 +270,44 @@ class Dispersion:
def postaction(self):
for key in self.pre.keys():
self.pre[key]['adj'].set_target_value(self.pre[key]['InitVal'])
def scan(self):
self.sc=self.scanner.ascan_list(self.adj,self.values,
filename=self.branch,start_immediately = False,
n_pulses=self.Ns,return_to_initial_values=True)
# self.preaction() ######
self.sc.run()
# self.auxdata = getAux(self.aux)
# self.postaction() #######
def stop(self):
if self.sc is None:
return
self.sc.stop()
def running(self):
if self.sc is None:
return False
return self.sc.running
def status(self):
if self.sc is None:
return 0,0
si = self.sc.scan_info.to_dict()
steps = 0
if 'scan_values' in si:
steps=len(si['scan_values'])
steps=len(si['scan_values'])
return steps,self.N
def info(self):
if self.sc is None:
return None
return self.sc.scan_info.to_dict()
#--------------------
# main implementation for debugging
if __name__ == '__main__':
daq = Dispersion()
daq.setup(1,10,100)
# threaded execution
scanner=SlicScan()
scanner.start(daq,True)

5
app/runDispersionTools.sh Executable file
View File

@ -0,0 +1,5 @@
#!/bin/bash
export PYTHONPATH=/sf/bd/packages/slic:/sf/bd/packages/bstrd:$PYTHONPATH
python dispersiontools.py

View File

@ -14,7 +14,9 @@ class XTCAVStabilizer:
# the PV
self.PVTCAV = PV('SATMA02-RMSM:SM-GET',connection_timeout=0.9, callback=self.stateChanged)
self.state=self.PVTCAV.value == 9
self.PVVolt = PV('SATMA02-RSYS:SET-ACC-VOLT')
self.PVPhase = PV('SATMA02-RSYS:SET-BEAM-PHASE')
# the BS channels
self.bs = BSCache(100000,10000) # 100 second timeout, size for 100 second data taken
self.channels = ['SATBD02-DBPM040:X2','SATMA02-RLLE-DSP:PHASE-VS','SATBD02-DBPM040:X2-VALID']
@ -22,6 +24,15 @@ class XTCAVStabilizer:
self.bs.get_vars(self.channels) # this starts the stream into the cache
self.bs.stop()
def getPhase(self):
return self.PVPhase.value
def setPhase(self,phi):
self.PVPhase.set(phi)
def getVoltage(self):
return self.PVVolt.value
def stateChanged(self,pvname=None, value=0, **kws):
self.state = value == 9

View File

@ -1,6 +1,11 @@
from .snap import getSnap
from .snap import saveSnap
from .snap import loadSnap
from .snap import parseSnapShotReqYAML
from .save import saveDataset
from .save import getDatasetFileName
from .load import loadDataset
from .elog import writeElog
from .slic import SlicScan
from .doocs import doocsread
from .doocs import doocswrite

View File

@ -25,23 +25,16 @@ class SlicScan(QObject):
self.data = None
self.act = None
self.snap = None
self.doSnap = False
def start(self,daq,snap=False):
self.clear()
self.doSnap = snap
self.daq=daq
Thread(target=self.Tmonitor).start()
self.startSnap(snap)
def startSnap(self,snap=False):
if snap:
Thread(target=self.Tsnap).start()
def Tsnap(self):
self.snap = getSnap()
self.sigsnap.emit(True)
Thread(name='Scan-Monitor',target=self.Tmonitor).start()
def Tmonitor(self):
mythread = Thread(target=self.Tscanner).start()
mythread = Thread(name='Slic-Scanner',target=self.Tscanner).start()
time.sleep(1)
ostep = -1
while(self.daq.running()):
@ -58,10 +51,21 @@ class SlicScan(QObject):
if hasattr(self.daq,'auxdata'):
self.data.update(self.daq.auxdata)
self.sigterm.emit(istep==nstep)
# if requested add snapshot acquisition
if self.doSnap:
self.startSnap()
def Tscanner(self):
self.daq.scan()
def startSnap(self):
Thread(name='Snap-Acquisition',target=self.Tsnap).start()
def Tsnap(self):
self.snap = getSnap()
print('Acquired snap')
self.sigsnap.emit(True)
def stop(self):
self.daq.stop()

View File

@ -57,15 +57,14 @@ def getSnap(pvs=None):
ret={}
val = epics.caget_many(pvs)
for i,pv in enumerate(pvs):
if val[i]: # filter out None values
if not val[i] is None: # filter out None values
ret[pv]=float(val[i])
epics.ca.clear_cache()
return ret
def saveSnap(pvs={},label="", comment = "generated by application"):
def saveSnap(pvs={},label="", comment = "generated by application",reqfile = "SF_settings.yaml"):
filename = datetime.datetime.now().strftime('/sf/data/applications/snapshot/SF_settings_%Y%m%d_%H%M%S.snap')
with open(filename,'w') as fid:
fid.write('#{"labels":["%s"],"comment":"%s", "machine_parms":{}, "save_time": 0.0, "req_file_name": "SF_settings.yaml"}\n' % (label,comment))
fid.write('#{"labels":["%s"],"comment":"%s", "machine_parms":{}, "save_time": 0.0, "req_file_name": "%s"}\n' % (label,comment,reqfile))
for key in pvs.keys():
if isinstance(pvs[key],int):
fid.write('%s,{"val": %d}\n' % (key,pvs[key]))
@ -73,6 +72,21 @@ def saveSnap(pvs={},label="", comment = "generated by application"):
fid.write('%s,{"val": %f}\n' % (key,pvs[key]))
elif isinstance(pvs[key],str):
fid.write('%s,{"val": %s}\n' % (key,pvs[key]))
return filename
def loadSnap(filename):
res={}
with open(filename,'r') as fid:
lines=fid.readlines()
for line in lines[1:]:
split = line.split(',')
pv = split[0].strip()
val = split[1].split('"val":')[1].split('}')[0]
if '.' in val:
res[pv]=float(val)
else:
res[pv]= val
return res

View File

@ -0,0 +1,33 @@
import signal
import ServerBase
class ServerTemplate(ServerBase.ServerBase):
def __init__(self, PVroot = 'MyServer', debug = False):
self.version='1.0.0'
self.program ='Server Template'
super(ServerTemplate, self).__init__(PVroot,debug,'127.0.0.1', 5678) # last too numbers are the IP adress and port of watchdog
# connect to the individual handler, which must have the function terminate() implemented
# each handler should be their own thread to not interfere with the main process-loop
self.handler={}
def terminateSubThreads(self):
for subserver in self.handler.keys():
self.handler[subserver].terminate()
if __name__ == '__main__':
debug = True
server = ServerTemplate('SF-BC-SERVER', debug)
signal.signal(signal.SIGTERM,server.terminate)
try:
server.run()
except KeyboardInterrupt:
server.terminate(None,None)

View File

@ -0,0 +1,5 @@
from .zmqbase import ZMQBase
from .serverbase import ServerBase
from .simplecapture import SimpleCapture

92
util/serverbase.py Normal file
View File

@ -0,0 +1,92 @@
import sys
import signal
import os
import socket
import logging
import logging.handlers
from logging.handlers import RotatingFileHandler
from datetime import datetime
import time
from epics import PV
sys.path.append('/sf/bd/packages/sfbd')
from sfbd.util import ZMQBase
class ServerBase(ZMQBase):
def __init__(self, root = 'MyServer', debug = False, WDServer = '127.0.0.1', WDPort = 5678):
super(ServerBase,self).__init__(WDServer,WDPort)
self.debug = debug
self.root = root
self.suffix=''
if self.debug:
self.suffix='-SIMU'
self.host = socket.gethostname()
self.pid = os.getpid() # process ID
# enabling logging
self.logfilename="/sf/data/applications/BD-SERVER/%s.log" % self.root
handler = RotatingFileHandler(filename=self.logfilename,
mode='a',
maxBytes=5 * 1024 * 1024,
backupCount=1,
delay=0)
if self.debug:
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M:%S')
else:
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M:%S',
handlers=[handler,])
self.logger=logging.getLogger(self.program)
# setting up ZMQ interface
self.ZMQServerInfo(self.root,self.host,self.pid)
# individual channels of main thread
self.PVstop = PV('%s:STOP%s' % (self.root,self.suffix))
self.PVstop.value = 0
self.PVstop.add_callback(self.stop)
self.PVping = PV('%s:PING%s' % (self.root,self.suffix))
self.PVlog = PV('%s:LOG%s' % (self.root,self.suffix))
def stop(self,pvname=None,value=None,**kws):
if value > 0:
self.logger.info('PV:STOP triggered at %s' % datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
self.running=False
def start(self):
self.logger.info('Starting Server: %s at %s' % (self.root,datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
self.logger.info('PV Root: %s' % self.root)
self.logger.info('Version: %s' % self.version)
self.logger.info('Host: %s' % self.host)
self.logger.info('PID: %d' % self.pid)
if self.debug:
self.logger.info('Debug Mode')
def run(self):
self.start()
self.running=True
while self.running:
time.sleep(1)
if self.ZMQPoll():
self.logger.info('Watchdog Server requested termination')
self.running = False
self.PVping.value = datetime.now().strftime('Last active at %Y-%m-%d %H:%M:%S')
self.terminate(None,None)
def terminate(self,signum,frame):
# stopping any sub thread with the server specific function terminateSubThreads
self.terminateSubThreads()
self.logger.info('Terminating Server at %s' % datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
self.ZMQPoll('quit') # informing the watchdog
print('Bunch Compressor Server is quitting...')
sys.exit(0)

70
util/zmqbase.py Normal file
View File

@ -0,0 +1,70 @@
import zmq
import socket
import sys
class ZMQBase:
def __init__(self, host='127.0.0.1',port = 5678):
self.host=host
self.port = port
self.msg={'action':'','PV':'','host':'','pid':0}
self.REQUEST_TIMEOUT = 500
self.REQUEST_RETRIES = 2
self.SERVER_ENDPOINT = "tcp://%s:%d" % (host,port)
self.serverIsOffline=False # assume that it is online
def ZMQServerInfo(self,PVroot,host,pid):
self.msg['PV']=PVroot
self.msg['host']=host
self.msg['pid']=pid
def ZMQPoll(self,tag='ping'):
self.msg['action']=tag
context = zmq.Context()
client = context.socket(zmq.REQ)
client.connect(self.SERVER_ENDPOINT)
client.send_pyobj(self.msg)
retries_left = self.REQUEST_RETRIES
while True:
if (client.poll(self.REQUEST_TIMEOUT) & zmq.POLLIN) != 0:
reply = client.recv_pyobj()
check = self.ZMQIdentifyReply(reply)
if self.serverIsOffline:
self.logger.info("Watchdog server came online")
self.serverIsOffline=False
if check:
if reply['action'] == 'quit':
client.close()
context.term()
return (reply['action'] == 'quit')
else:
self.logger.warning("Malformed reply from server")
continue
retries_left -= 1
# Socket is confused. Close and remove it.
client.setsockopt(zmq.LINGER, 0)
client.close()
if retries_left == 0:
if not self.serverIsOffline:
self.logger.info("Watchdog server seems to be offline")
self.serverIsOffline=True
context.term()
return False
# Create new connection
client = context.socket(zmq.REQ)
client.connect(self.SERVER_ENDPOINT)
client.send_pyobj(self.msg)
def ZMQIdentifyReply(self,reply):
for field in ['PV','host','pid']:
if not reply[field] == self.msg[field]:
return False
return True