servicemanager/base.py

479 lines
18 KiB
Python

# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
import sys
import json
import os
from os.path import expanduser, basename
import subprocess
import time
import re
import socket
import psutil
from collections import OrderedDict, defaultdict
from configparser import ConfigParser
class ServiceDown(Exception):
"""the service is not running"""
class UsageError(Exception):
pass
def printTable(headers, items, printfunc, minlen=0, rjust=False):
"""Print tabular information nicely formatted.
accept less columns for some rows. The last item of such a row
may extend over columns. stolen from nicos.utils and modifed.
"""
if not headers and not items:
return
ncolumns = len(headers or items[0])
rowlens = [minlen] * ncolumns
for row in [headers or []] + items:
# do not consider length of last column when row has less columns
cntrow = row if len(row) == ncolumns else row[:-1]
for i, item in enumerate(cntrow):
rowlens[i] = max(rowlens[i], len(item))
rfmtstr = ('%%%ds ' * ncolumns) % tuple(rowlens)
lfmtstr = ('%%-%ds ' * ncolumns) % tuple(rowlens)
if headers:
printfunc(lfmtstr % tuple(headers))
printfunc(lfmtstr % tuple('=' * n for n in rowlens))
for row in items:
printfunc((rfmtstr if rjust else lfmtstr) % (tuple(row) + ('',) * (ncolumns - len(row))))
def get_config():
parser = ConfigParser(interpolation=None)
parser.optionxform = str
parser.read(expanduser('~/servicemanager.cfg'))
return parser
class ServiceManager:
services = ['SINGLE']
need_cfg = False
start_dir = None
group = None
all = {} # for the list command, we want to register all service managers
virtualenv = None
pkg = ''
revcmd = {}
USAGE = None
main_ins = None
def __init__(self):
self.env = {}
self.remote_hosts = {}
self.commands = {}
# self.revcmd = {}
self.info = {}
self.all[self.group] = self
#prog, args = self.command.split(None, 1)
#self.cmdpat = re.compile('.* ' + # do not match prog, as it might be modified by the os
# (args % dict(ins=r'(?P<ins>\S*)', serv=r'(?P<serv>\S*)',
# cfg=r'(?P<cfg>\S*)', port=r'\S*', pkg=r'\S*')))
self.get_info()
self.stopped = defaultdict(dict)
def get_services(self, section):
ports = {}
nr = '%02d' % int(section[self.group])
gr = self.group.upper()
singlekey = gr + '_PORT'
for service in self.services:
sv = gr + '_' + service.upper()
key = '%s_PORT' % sv
port = section.get(key) or section.get(singlekey)
if port or json.loads(section.get(sv, '0').lower()):
# e.g. NICOS_POLLER = True leads to port = 0
port = (port or '0').replace('nr', nr)
ports[service] = int(port)
if key in section: # update replaced nr
section[key] = port
return ports
def get_info(self):
"""returns port numbers,commands and environment variables
the result is a dict[<ins>] of dict[<service>] of port or None
"""
result = OrderedDict()
parser = get_config()
defaults = parser['DEFAULT']
self.commands = {}
# self.revcmd = {}
def get_subs(section, key, ins, nr):
"""get item <key> from section and substitute nr or expand filename"""
value = section.get(key)
if key.endswith('_PORT'):
return value.replace('nr', nr)
value = value.replace('<INS>', ins)
return ':'.join(expanduser(p % dict(ins=ins)) for p in value.split(':'))
for ins in parser.sections():
section = dict(parser[ins])
if ins == 'MAIN':
ins = self.main_ins = basename(expanduser('~'))
if 'REMOTE_HOST' in section:
self.remote_hosts[ins] = section['REMOTE_HOST']
command = section.get('%s_command' % self.group)
self.revcmd[command] = self.group
nr = section.get(self.group)
if nr is not None:
nr = '%02d' % int(nr)
self.commands[ins] = command
services = self.get_services(section)
env = {k: get_subs(section, k, ins, nr) for k in section if k.isupper()}
result[ins] = services
self.env[ins] = env
self.info = result
#def get_cmdpats(self, groups):
# return self.cmdpats
def get_ins_info(self, ins):
self.get_info()
return self.info[ins]
def get_cfg(self, ins, service):
"""return cfg info about running programs, if relevant
example for sea: return samenv name
"""
return ''
def get_procs(self, groups=None, cfginfo=None):
"""return processes
result is a dict[ins] of dict[service] of list of tuples (process, cfg)
"""
result = {}
cmdpatterns = []
if groups is None:
groups = [self.group]
for cmd, group in self.revcmd.items():
if group not in groups:
continue
args = cmd.split(None, 1)[1]
cmdpatterns.append(
re.compile('.* ' + # do not match prog, as it might be modified by the os
(args % dict(ins=r'(?P<ins>\S*)',
serv=r'(?P<serv>\S*)',
cfg=r'(?P<cfg>\S*)',
port=r'\S*',
pkg=r'\S*'))))
for p in psutil.process_iter(attrs=['pid', 'cmdline']):
cmdline = p.info['cmdline']
if cmdline:
cmd = ' '.join(cmdline)
for cmdpat in cmdpatterns:
match = cmdpat.match(cmd)
if match:
gdict = match.groupdict()
ins = gdict.get('ins', self.main_ins)
serv = gdict.get('serv', '')
if cfginfo is not None and 'cfg' in gdict:
cfginfo[ins, serv] = gdict['cfg']
result.setdefault(ins, {}).setdefault(serv, []).append(p)
return result
def wildcard(self, ins):
if ins is None or ins == 'all':
return list(self.info)
pat = re.sub(r'(\.|\*)', '.*', ins)
if pat == ins:
return None
pat = re.compile(pat)
return [k for k in self.info if pat.match(k)]
def check_running(self, ins, service):
self.get_info()
if ins not in self.info:
raise KeyError("don't know %r" % ins)
sp_ins = ' ' + ins if ins != self.main_ins else ''
if ins in self.remote_hosts:
return
if not self.get_procs().get(ins, {}).get(service):
startcmd = '%s start%s' % (self.group, sp_ins)
raise ServiceDown('%s%s is not running - please use %r' % (self.group, sp_ins, startcmd))
def stop(self, ins, service=None):
"""stop service (or all services) of instance <ins>"""
ins_list = self.wildcard(ins)
if ins_list is not None:
return ins_list
procs = self.get_procs()
done = False
services = self.services if service is None else [service]
for service in reversed(services):
for p in procs.get(ins, {}).get(service, []):
print_wait = True
for action in ('terminate', 'kill'):
getattr(p, action)() # p.terminate or p.kill
for i in range(10): # total 0.1 * 10 * 9 / 2 = 4.5 sec
try:
p.wait(0.1 * i)
except psutil.TimeoutExpired:
if p.status() == psutil.STATUS_ZOMBIE:
break
if print_wait and i > 4:
print('wait for %s %s' % (ins, service))
print_wait = False
continue
self.stopped[ins][service] = ' '.join(p.info['cmdline'])
break
else:
if action == 'kill':
action = 'kill fail'
break
continue
break
done = True
print('%s %s %s' % (ins, service, (action + 'ed').replace('eed', 'ed')))
return done
def do_stop(self, ins, service=None, *args):
self.get_info()
ins_list = self.wildcard(ins)
if ins_list is not None:
return ins_list
if not self.stop(ins, service):
print('nothing to stop')
def prepare_start(self, ins, service, cfg=''):
if ins not in self.env:
self.get_info()
gr = self.group.upper()
env = self.env[ins]
return env.get('%s_ROOT' % gr, ''), env
def do_start(self, ins, service=None, cfg='', restart=False, wait=False, logger=None):
ins_list = self.wildcard(ins)
if ins_list is not None:
return ins_list
if logger is None:
class logger:
@staticmethod
def info(fmt, *args):
print(fmt % args)
@staticmethod
def error(fmt, *args):
print(('ERROR: ' + fmt) % args)
if ins is None:
logger.info('nothing to start')
return
try:
service_ports = self.get_ins_info(ins)
except ValueError:
raise ValueError('do not know %r' % ins)
if ins in self.remote_hosts:
raise ValueError('can not start, %s is running on a remote host' % self.group)
services = list(service_ports) if service is None else [service]
if restart:
self.stop(ins, service)
else:
procs = self.get_procs()
to_start = []
for service in services:
n = len(procs.get(ins, {}).get(service, []))
if n == 0:
to_start.append(service)
else:
count = '' if n == 1 else ' %sx' % n
logger.info('%s %s is already running%s', ins, service, count)
services = to_start
for service_i in services:
port = service_ports[service_i]
cmd = self.commands[ins] % dict(ins=ins, serv=service_i, port=port, cfg=cfg, pkg=self.pkg)
if '%(cfg)s' in self.commands[ins] and not cfg:
cmd = self.stopped[ins].get(service_i)
if not cmd:
if restart and service is None:
continue # silently ignore missing cfg when restarting all services
raise UsageError('missing cfg for %s %s' % (ins, service_i))
wd = os.getcwd()
try:
start_dir, env = self.prepare_start(ins, service_i, cfg)
env = dict(os.environ, **env)
os.chdir(start_dir)
if start_dir not in sys.path:
sys.path.insert(0, start_dir)
if wait:
proc = subprocess.Popen(cmd.split(), env=env)
proc.wait()
return
process = subprocess.Popen(cmd.split(), env=env, start_new_session=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if not port:
logger.info('%sstarted: %s', 're' if restart else '', cmd)
continue
print_wait = True
for i in range(15): # total 15 * 14 / 2 * 0.1 = 10.5 sec
returnvalue = process.poll()
if returnvalue is not None:
logger.info('started process failed, try to find out why')
process = subprocess.Popen(cmd.split(), env=env, stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE)
try:
_, erroutput = process.communicate(timeout=10)
except subprocess.TimeoutExpired:
process.kill()
_, erroutput = process.communicate()
logger.error('%s %s died\n%s', ins, service_i, erroutput.decode())
break
try:
if print_wait and i > 4:
logger.info('wait for port %s', port)
print_wait = False
s = socket.create_connection(('localhost', port), timeout=5)
s.close()
except socket.error:
time.sleep(0.1 * i)
continue
logger.info('%sstarted: %s', 're' if restart else '', cmd)
break
else:
logger.info(cmd)
logger.error('starting failed: %s', cmd)
finally:
os.chdir(wd)
def do_restart(self, ins, service=None, cfg=None, logger=None):
ins_list = self.wildcard(ins)
if ins_list is not None:
return ins_list
self.do_start(ins, service, cfg, True, logger=logger)
def do_run(self, ins, service=None, cfg=None):
"""for tests: run and wait"""
if self.wildcard(ins) is not None:
raise UsageError('no wildcards allowed with %s run' % self.group)
if not service:
try:
service, = self.services
except ValueError:
raise UsageError('need service to start (one of %s)' % ', '.join(self.services))
self.do_start(ins, service, cfg, wait=True)
def do_list(self, ins=None, *args):
"""info about running services"""
show_unused = ins == 'all'
if show_unused:
ins = None
instances = self.wildcard(ins)
if instances is None:
ins_set = {ins}
else:
ins_set = set(instances)
cfginfo = {}
procs = self.get_procs(self.all, cfginfo)
rows = []
merged = OrderedDict()
for group, sm in self.all.items():
sm.get_info()
for ins_i, info_dict in sm.info.items():
if ins_i not in ins_set:
continue
for serv, port in info_dict.items():
if ins_i not in merged:
merged[ins_i] = {g: {} for g in self.all}
merged[ins_i][group][serv] = port
for ins_i, info_dict in merged.items():
show_ins = show_unused
run_info = [[''], [ins_i]]
procs_dict = procs.get(ins_i, {})
for group, sm in self.all.items():
info_grp = info_dict.get(group, {})
for serv, port in info_grp.items():
plist = procs_dict.get(serv)
if plist:
if ins is None:
cfg = ''
else:
cfg = cfginfo.get((ins_i, serv), '') or sm.get_cfg(ins_i, serv)
if sm == self:
show_ins = True
gs = '%s %s' % (group, serv)
port = str(port or '')
run_info.append(('', gs, port, cfg))
if len(plist) > 1:
run_info.append(['', ' WARNING: multiple processes %s'
% ', '.join(str(p.pid) for p in plist)])
extra = sm.extra_info(ins_i)
if extra and show_ins:
run_info.append(['', extra])
if show_ins:
rows.extend(run_info)
print('')
printTable(('inst', 'service', 'port', '' if ins is None else 'cfg'), rows, print)
@staticmethod
def extra_info(ins):
"""provide extra info or None"""
return None
def action(self, action, *args):
method = getattr(self, 'do_' + action, None)
if not callable(method):
raise UsageError('%s is no valid action' % action)
try:
ins_list = method(*args)
except TypeError as e:
errtxt = str(e)
if 'do_%s(' % action in errtxt and 'argument' in errtxt:
raise UsageError(errtxt)
raise
# treat wildcards:
for ins in ins_list or ():
print('\n%s %s %s:' % (action, self.group, ins))
try:
method(ins, *args[1:])
except Exception as e:
print(str(e))
def do_help(self, *args):
if self.main_ins:
usage = self.USAGE.replace(' <instance>', '').replace(' [<instance>]', '') % ''
else:
usage = self.USAGE % ('<instance> is one of %s' % ', '.join(self.info))
print(usage)
def treat_args(self, argdict, unknown=(), extra=()):
if unknown:
raise UsageError('unknown argument: %s' % (' '.join(unknown)))
if extra:
return [argdict.get('ins'), argdict.get('service')] + extra
args = [argdict.get('ins'), argdict.get('service')]
while args and not args[-1]:
args.pop()
return args