servicemanager/base.py

406 lines
16 KiB
Python

# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
import json
import os
from os.path import expanduser
import subprocess
import time
import re
import socket
import psutil
from collections import OrderedDict, defaultdict
from configparser import ConfigParser
class ServiceDown(Exception):
"""the service is not running"""
class UsageError(Exception):
pass
def printTable(headers, items, printfunc, minlen=0, rjust=False):
"""Print tabular information nicely formatted.
accept less columns for some rows. The last item of such a row
may extend over columns. stolen from nicos.utils and modifed.
"""
if not headers and not items:
return
ncolumns = len(headers or items[0])
rowlens = [minlen] * ncolumns
for row in [headers or []] + items:
# do not consider length of last column when row has less columns
cntrow = row if len(row) == ncolumns else row[:-1]
for i, item in enumerate(cntrow):
rowlens[i] = max(rowlens[i], len(item))
rfmtstr = ('%%%ds ' * ncolumns) % tuple(rowlens)
lfmtstr = ('%%-%ds ' * ncolumns) % tuple(rowlens)
if headers:
printfunc(lfmtstr % tuple(headers))
printfunc(lfmtstr % tuple('=' * n for n in rowlens))
for row in items:
printfunc((rfmtstr if rjust else lfmtstr) % (tuple(row) + ('',) * (ncolumns - len(row))))
def run(serv, arglist):
arglist = arglist + [''] # add dummy argument
action = arglist.pop(0) if hasattr(serv, 'do_' + arglist[0]) else 'gui'
instance = arglist.pop(0) if arglist[0] and arglist[0] not in serv.services else None
if instance is None and len(serv.info) == 1:
instance = list(serv.info)[0]
if instance is not None:
arglist.insert(0, instance)
arglist.pop() # remove dummy argument
try:
serv.action(action, *arglist)
except AttributeError as e:
raise
print(repr(e))
raise ValueError("do not know '%s'" % ' '.join([serv.group, action] + arglist))
class ServiceManager:
services = None
need_cfg = False
start_dir = None
group = None
all = {} # for the list command, we want to register all service managers
virtualenv = None
pkg = ''
revcmd = {}
def __init__(self):
self.env = {}
self.commands = {}
# self.revcmd = {}
self.info = {}
self.all[self.group] = self
#prog, args = self.command.split(None, 1)
#self.cmdpat = re.compile('.* ' + # do not match prog, as it might be modified by the os
# (args % dict(ins=r'(?P<ins>\S*)', serv=r'(?P<serv>\S*)',
# cfg=r'(?P<cfg>\S*)', port=r'\S*', pkg=r'\S*')))
self.get_info()
self.stopped = defaultdict(dict)
def get_services(self, section):
ports = {}
nr = '%02d' % int(section[self.group])
gr = self.group.upper()
for service in self.services:
sv = '%s_%s' % (gr, service.upper())
key = '%s_PORT' % sv
port = section.get(key)
if port or json.loads(section.get(sv, '0').lower()):
# e.g. NICOS_POLLER = True leads to port = 0
port = (port or '0').replace('nr', nr)
ports[service] = int(port)
if key in section: # update replaced nr
section[key] = port
return ports
def get_info(self):
"""returns port numbers,commands and environment variables
the result is a dict[<service>] of dict(ins=.., port= ..., cmd= ...)
if ins is omitted, return a list of above for all ins
"""
result = OrderedDict()
parser = ConfigParser(interpolation=None)
parser.optionxform = str
parser.read(expanduser('~/servman.cfg'))
defaults = parser['DEFAULT']
self.commands = {}
# self.revcmd = {}
def expand_path(pathlist, ins):
return ':'.join(expanduser(p % dict(ins=ins)) for p in pathlist.split(':'))
for ins in parser.sections():
section = dict(parser[ins])
command = section.get('%s_command' % self.group)
self.revcmd[command] = self.group
if self.group in section:
self.commands[ins] = command
services = self.get_services(section)
env = {k: expand_path(section.get(k), ins) for k in defaults if k.isupper()}
result[ins] = services
self.env[ins] = env
self.info = result
#def get_cmdpats(self, groups):
# return self.cmdpats
def get_ins_info(self, ins):
self.get_info()
return self.info[ins]
def get_cfg(self, ins, service):
"""return cfg info about running programs, if relevant
example for sea: return samenv name
"""
return ''
def get_procs(self, groups=None, cfginfo=None):
"""return processes
result is a dict[ins] of dict[service] of list of tuples (process, cfg)
"""
result = {}
cmdpatterns = []
if groups is None:
groups = [self.group]
for cmd, group in self.revcmd.items():
if group not in groups:
continue
args = cmd.split(None, 1)[1]
cmdpatterns.append(
re.compile('.* ' + # do not match prog, as it might be modified by the os
(args % dict(ins=r'(?P<ins>\S*)',
serv=r'(?P<serv>\S*)',
cfg=r'(?P<cfg>\S*)',
port=r'\S*',
pkg=r'\S*'))))
for p in psutil.process_iter(attrs=['pid', 'cmdline']):
cmdline = p.info['cmdline']
if cmdline:
cmd = ' '.join(cmdline)
for cmdpat in cmdpatterns:
match = cmdpat.match(cmd)
if match:
gdict = match.groupdict()
ins = gdict['ins']
serv = gdict['serv']
if cfginfo is not None and 'cfg' in gdict:
cfginfo[ins, serv] = gdict['cfg']
result.setdefault(ins, {}).setdefault(serv, []).append(p)
return result
def check_running(self, ins, service):
self.get_info()
if ins not in self.info:
raise KeyError("don't know %r" % ins)
if not self.get_procs().get(ins, {}).get(service):
raise ServiceDown('%s %s is not running' % (service, ins))
def stop(self, ins, service=None):
"""stop service (or all services) of instance <ins>
return a dict[<ins>][<service>] of <cfg> for all stopped processes
this information may be used for restarts
"""
procs = self.get_procs()
done = False
services = self.services if service is None else [service]
for service in reversed(services):
for p in procs.get(ins, {}).get(service, []):
print_wait = True
for action in ('terminate', 'kill'):
getattr(p, action)() # p.terminate or p.kill
for i in range(10): # total 0.1 * 10 * 9 / 2 = 4.5 sec
try:
p.wait(0.1 * i)
except psutil.TimeoutExpired:
if p.status() == psutil.STATUS_ZOMBIE:
break
if print_wait and i > 4:
print('wait for %s %s' % (ins, service))
print_wait = False
continue
self.stopped[ins][service] = ' '.join(p.info['cmdline'])
done = True
break
else:
if action == 'kill':
action = 'kill fail'
break
continue
break
print('%s %s %s' % (ins, service, (action + 'ed').replace('eed', 'ed')))
return done
def do_stop(self, ins, service=None, *args):
self.get_info()
if not self.stop(ins, service):
print('nothing to stop')
def prepare_start(self, ins):
if ins not in self.env:
self.get_info()
gr = self.group.upper()
env = self.env[ins]
return env.get('%s_ROOT' % gr, ''), env
def do_start(self, ins, service=None, cfg='', restart=False, wait=False):
if ins is None:
print('nothing to start')
return
try:
service_ports = self.get_ins_info(ins)
except ValueError:
raise ValueError('do not know %r' % ins)
services = list(service_ports) if service is None else [service]
if restart:
self.stop(ins, service)
else:
procs = self.get_procs()
to_start = []
for service in services:
n = len(procs.get(ins, {}).get(service, []))
if n == 0:
to_start.append(service)
else:
count = '' if n == 1 else ' %sx' % n
print('%s %s is already running%s' % (ins, service, count))
services = to_start
for service_i in services:
port = service_ports[service_i]
cmd = self.commands[ins] % dict(ins=ins, serv=service_i, port=port, cfg=cfg, pkg=self.pkg)
if '%(cfg)s' in self.commands[ins] and not cfg:
cmd = self.stopped[ins].get(service_i)
if not cmd:
if restart and service is None:
continue # silently ignore missign cfg when restarting all services
raise ValueError('missing cfg for %s %s' % (ins, service_i))
wd = os.getcwd()
try:
start_dir, env = self.prepare_start(ins)
env = dict(os.environ, **env)
os.chdir(start_dir)
if wait:
proc = subprocess.Popen(cmd.split(), env=env)
proc.wait()
return
process = subprocess.Popen(cmd.split(), env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if not port:
print('%s %s started' % (ins, service_i))
continue
print_wait = True
for i in range(10): # total 10 * 9 / 2 = 4.5 sec
returnvalue = process.poll()
if returnvalue is not None:
print('started process finished with %r' % returnvalue)
process = subprocess.Popen(cmd.split(), env=env, stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE)
try:
_, erroutput = process.communicate(timeout=10)
except subprocess.TimeoutExpired:
process.kill()
_, erroutput = process.communicate()
print(erroutput.decode())
print('%s %s died' % (ins, service_i))
break
try:
if print_wait and i > 4:
print('wait for port %s' % port)
print_wait = False
s = socket.create_connection(('localhost', port), timeout=5)
s.close()
except socket.error:
time.sleep(0.1 * i)
continue
if restart:
print('%s %s restarted' % (ins, service_i))
else:
print('%s %s started' % (ins, service_i))
break
else:
print(cmd)
print('starting %s %s failed' % (ins, service_i))
finally:
os.chdir(wd)
def do_restart(self, ins, service=None, cfg=None):
self.do_start(ins, service, cfg, True)
def do_run(self, ins, service, cfg=None):
"""for tests: run and wait"""
self.do_start(ins, service, cfg, wait=True)
def do_list(self, ins=None, *args):
"""info about running services"""
cfginfo = {}
procs = self.get_procs(self.all, cfginfo)
rows = []
merged = OrderedDict()
show_unused = ins == 'all'
if show_unused:
ins = None
for group, sm in self.all.items():
sm.get_info()
for ins_i, info_dict in sm.info.items():
if ins is not None and ins != ins_i:
continue
for serv, port in info_dict.items():
if ins_i not in merged:
merged[ins_i] = {g: {} for g in self.all}
merged[ins_i][group][serv] = port
for ins_i, info_dict in merged.items():
show_ins = show_unused
run_info = [[''], [ins_i]]
procs_dict = procs.get(ins_i, {})
for group, sm in self.all.items():
info_grp = info_dict.get(group, {})
for serv, port in info_grp.items():
plist = procs_dict.get(serv)
if plist:
cfg = cfginfo.get((ins_i, serv), '') or sm.get_cfg(ins_i, serv)
if sm == self:
show_ins = True
gs = '%s %s' % (group, serv)
port = str(port or '')
run_info.append(('', gs, port, cfg))
if len(plist) > 1:
rows.append(['', ' WARNING: multiple processes %s'
% ', '.join(str(p.pid) for p, _ in plist)])
extra = sm.extra_info(ins_i)
if extra and show_ins:
run_info.append(['', extra])
if show_ins:
rows.extend(run_info)
print('')
printTable(('inst', 'service', 'port', 'cfg'), rows, print)
@staticmethod
def extra_info(ins):
"""provide extra info or None"""
return None
def action(self, action, *args):
method = getattr(self, 'do_' + action, None)
if not callable(method):
raise UsageError('%s is no valid action' % action)
try:
method(*args)
except TypeError as e:
raise
errtxt = str(e)
if ' do_%s(' % action in errtxt and 'argument' in errtxt:
raise UsageError(errtxt)
raise