# -*- coding: utf-8 -*- # ***************************************************************************** # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # # ***************************************************************************** import sys import json import os from os.path import expanduser, basename, exists import subprocess import time import re import socket import psutil from collections import OrderedDict, defaultdict from configparser import ConfigParser class ServiceDown(Exception): """the service is not running""" class UsageError(Exception): pass def printTable(headers, items, printfunc, minlen=0, rjust=False): """Print tabular information nicely formatted. accept less columns for some rows. The last item of such a row may extend over columns. stolen from nicos.utils and modifed. """ if not headers and not items: return ncolumns = len(headers or items[0]) rowlens = [minlen] * ncolumns for row in [headers or []] + items: # do not consider length of last column when row has less columns cntrow = row if len(row) == ncolumns else row[:-1] for i, item in enumerate(cntrow): rowlens[i] = max(rowlens[i], len(item)) rfmtstr = ('%%%ds ' * ncolumns) % tuple(rowlens) lfmtstr = ('%%-%ds ' * ncolumns) % tuple(rowlens) if headers: printfunc(lfmtstr % tuple(headers)) printfunc(lfmtstr % tuple('=' * n for n in rowlens)) for row in items: printfunc((rfmtstr if rjust else lfmtstr) % (tuple(row) + ('',) * (ncolumns - len(row)))) def get_config(): parser = ConfigParser(interpolation=None) parser.optionxform = str parser.read(expanduser('~/servicemanager.cfg')) return parser class ServiceManager: services = ['SINGLE'] need_cfg = False start_dir = None group = None all = {} # for the list command, we want to register all service managers virtualenv = None pkg = '' revcmd = {} USAGE = None main_ins = None def __init__(self): self.env = {} self.remote_hosts = {} self.commands = {} # self.revcmd = {} self.info = {} self.all[self.group] = self #prog, args = self.command.split(None, 1) #self.cmdpat = re.compile('.* ' + # do not match prog, as it might be modified by the os # (args % dict(ins=r'(?P\S*)', serv=r'(?P\S*)', # cfg=r'(?P\S*)', port=r'\S*', pkg=r'\S*'))) self.get_info() self.stopped = defaultdict(dict) def get_services(self, section): ports = {} nr = '%02d' % int(section[self.group]) gr = self.group.upper() singlekey = gr + '_PORT' for service in self.services: sv = gr + '_' + service.upper() key = '%s_PORT' % sv port = section.get(key) or section.get(singlekey) if port or json.loads(section.get(sv, '0').lower()): # e.g. NICOS_POLLER = True leads to port = 0 port = (port or '0').replace('nr', nr) ports[service] = int(port) if key in section: # update replaced nr section[key] = port return ports def get_info(self): """returns port numbers,commands and environment variables the result is a dict[] of dict[] of port or None """ result = OrderedDict() parser = get_config() defaults = parser['DEFAULT'] self.commands = {} # self.revcmd = {} def get_subs(section, key, ins, nr): """get item from section and substitute nr or expand filename""" value = section.get(key) if key.endswith('_PORT'): return value.replace('nr', nr) value = value.replace('', ins) return ':'.join(expanduser(p % dict(ins=ins)) for p in value.split(':')) for ins in parser.sections(): section = dict(parser[ins]) if ins == 'MAIN': ins = self.main_ins = os.environ.get('Instrument') or basename(expanduser('~')) if 'REMOTE_HOST' in section: self.remote_hosts[ins] = section['REMOTE_HOST'] command = section.get('%s_command' % self.group) self.revcmd[command] = self.group nr = section.get(self.group) if nr is not None: nr = '%02d' % int(nr) services = self.get_services(section) env = {k: get_subs(section, k, ins, nr) for k in section if k.isupper()} result[ins] = services self.env[ins] = env cmd = command.replace('~', expanduser('~')) if cmd.startswith('PY '): cmd = env.get('PY', 'python3') + cmd[2:] self.commands[ins] = cmd self.info = result #def get_cmdpats(self, groups): # return self.cmdpats def get_ins_info(self, ins=None): self.get_info() if ins is None: ins = os.environ.get('Instrument') if ins is None: return {} return self.info[ins] def get_cfg(self, ins, service): """return cfg info about running programs, if relevant example for sea: return samenv name if service is None return a dict of """ cfginfo = {} self.get_procs(self.group, cfginfo) if service is None: return {s: cfginfo.get((ins, s)) or '' for s in self.services} return cfginfo.get((ins, service)) or '' def get_procs(self, groups=None, cfginfo=None): """return processes :param groups: group to look for or None for own groups :param cfginfo: cfginfo dict to be populated :result: a dict[ins] of dict[service] of list of tuples (process, cfg) """ result = {} cmdpatterns = [] if groups is None: groups = [self.group] for cmd, group in self.revcmd.items(): if group not in groups: continue args = cmd.split(None, 1)[1] cmdpatterns.append( re.compile('.* ' + # do not match prog, as it might be modified by the os (args % dict(ins=r'(?P\S*)', serv=r'(?P\S*)', cfg=r'(?P\S*)', port=r'\S*', pkg=r'\S*')))) for p in psutil.process_iter(attrs=['pid', 'cmdline']): cmdline = p.info['cmdline'] if cmdline: cmd = ' '.join(cmdline) for cmdpat in cmdpatterns: match = cmdpat.match(cmd) if match: gdict = match.groupdict() ins = gdict.get('ins', self.main_ins) serv = gdict.get('serv', '') if cfginfo is not None and 'cfg' in gdict: cfginfo[ins, serv] = gdict['cfg'] result.setdefault(ins, {}).setdefault(serv, []).append(p) return result def wildcard(self, ins): """return a list of matching instruments or None, when no wildcard character in ins """ if not ins or ins == 'all': return list(self.info) pat = re.sub(r'(\.|\*)', '.*', ins) if pat == ins: return None pat = re.compile(pat) return [k for k in self.info if pat.match(k)] def check_running(self, ins, service): self.get_info() if ins not in self.info: raise KeyError("don't know %r" % ins) sp_ins = ' ' + ins if ins != self.main_ins else '' if ins in self.remote_hosts: return if not self.get_procs().get(ins, {}).get(service): startcmd = '%s start%s' % (self.group, sp_ins) raise ServiceDown('%s%s is not running - please use %r' % (self.group, sp_ins, startcmd)) def stop(self, ins, service=None): """stop service (or all services) of instance """ ins_list = self.wildcard(ins) if ins_list is not None: return ins_list procs = self.get_procs() done = False services = self.services if service is None else [service] for service in reversed(services): for p in procs.get(ins, {}).get(service, []): print_wait = True for action in ('terminate', 'kill'): try: getattr(p, action)() # p.terminate or p.kill except psutil.NoSuchProcess: continue # already killed for i in range(10): # total 0.1 * 10 * 9 / 2 = 4.5 sec try: try: p.wait(0.1 * i) except psutil.TimeoutExpired: if p.status() != psutil.STATUS_ZOMBIE: if print_wait and i > 4: print('wait for %s %s' % (ins, service)) print_wait = False continue except psutil.NoSuchProcess: pass # process stopped in the meantime self.stopped[ins][service] = ' '.join(p.info['cmdline']) break else: if action == 'kill': action = 'kill fail' break continue break done = True print('%s %s %s' % (ins, service, (action + 'ed').replace('eed', 'ed'))) return done def do_stop(self, ins, service=None, *args): if not ins: raise UsageError(f'need instrument or "all" to stop all') self.get_info() ins_list = self.wildcard(ins) if ins_list is not None: return ins_list if not self.stop(ins, service): print('nothing to stop') def prepare_start(self, ins, service, cfg=''): if ins not in self.env: self.get_info() gr = self.group.upper() if not ins: raise UsageError('need instance') env = self.env[ins] return env.get('%s_ROOT' % gr, ''), env def do_start(self, ins, service=None, cfg='', restart=False, wait=False, logger=None, opts=''): if not ins: raise UsageError(f'need instrument or "all" to start all') ins_list = self.wildcard(ins) if ins_list is not None: return ins_list if logger is None: class logger: @staticmethod def info(fmt, *args): print(fmt % args) @staticmethod def error(fmt, *args): print(('ERROR: ' + fmt) % args) if ins is None: logger.info('nothing to start') return try: service_ports = self.get_ins_info(ins) except (KeyError, ValueError): raise UsageError('do not know %r' % ins) if ins in self.remote_hosts: raise UsageError('can not start, %s is running on a remote host' % self.group) services = list(service_ports) if service is None else [service] if restart: self.stop(ins, service) else: procs = self.get_procs() to_start = [] for service in services: n = len(procs.get(ins, {}).get(service, [])) if n == 0: to_start.append(service) else: count = '' if n == 1 else ' %sx' % n logger.info('%s %s is already running%s', ins, service, count) services = to_start for service_i in services: port = service_ports[service_i] # TODO: remove unused pkg cmd = self.commands[ins] % dict(ins=ins, serv=service_i, port=port, cfg=cfg, pkg=self.pkg or ins) if opts: cmd = f'{cmd} {opts}' print(cmd) if '%(cfg)s' in self.commands[ins] and not cfg: cmd = self.stopped[ins].get(service_i) if not cmd: if restart and service is None: continue # silently ignore missing cfg when restarting all services raise UsageError('missing cfg for %s %s' % (ins, service_i)) wd = os.getcwd() try: start_dir, env = self.prepare_start(ins, service_i, cfg) env = dict(os.environ, **env, Instrument=ins) os.chdir(start_dir) if start_dir not in sys.path: sys.path.insert(0, start_dir) nicosenv = '/home/nicos/nicos/nicosenv/bin/' if exists(nicosenv): env['PATH'] = f"{nicosenv}:{env['PATH']}" if wait: proc = subprocess.Popen(cmd.split(), env=env) for _ in range(3): try: proc.wait() break except KeyboardInterrupt: pass return process = subprocess.Popen(cmd.split(), env=env, start_new_session=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if not port: logger.info('%sstarted: %s', 're' if restart else '', cmd) continue print_wait = True for i in range(25): # total 25 * 24 / 2 * 0.1 = 30 sec returnvalue = process.poll() if returnvalue is not None: logger.info('started process failed, try to find out why') process = subprocess.Popen(cmd.split(), env=env, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) try: _, erroutput = process.communicate(timeout=10) except subprocess.TimeoutExpired: process.kill() _, erroutput = process.communicate() logger.error('%s %s died\n%s', ins, service_i, erroutput.decode()) break try: if print_wait and i > 4: logger.info('wait for port %s', port) print_wait = False s = socket.create_connection(('localhost', port), timeout=5) s.close() except socket.error: time.sleep(0.1 * i) continue logger.info('%sstarted: %s', 're' if restart else '', cmd) break else: logger.error('starting failed: %s', cmd) finally: os.chdir(wd) def do_restart(self, ins, service=None, cfg=None, logger=None): if not ins: raise UsageError("need instrument or 'all' or wildcard") ins_list = self.wildcard(ins) if ins_list is not None: if cfg is not None: raise UsageError('can not restart several instances with %s' % cfg) return ins_list self.do_start(ins, service, cfg, True, logger=logger) def do_run(self, ins, service=None, cfg=None, opts=''): """for tests: run and wait""" if self.wildcard(ins) is not None: raise UsageError('need instrument and service for "%s run"' % self.group) if not service: try: service, = self.services except ValueError: raise UsageError('need service to start (one of %s)' % ', '.join(self.services)) self.do_start(ins, service, cfg, wait=True, opts=opts) def do_list(self, ins=None, *args): """info about running services""" show_unused = ins == 'all' if show_unused: ins = None instances = self.wildcard(ins) if instances is None: ins_set = {ins} else: ins_set = set(instances) cfginfo = {} procs = self.get_procs(self.all, cfginfo) rows = [] merged = OrderedDict() for group, sm in self.all.items(): sm.get_info() for ins_i, info_dict in sm.info.items(): if ins_i not in ins_set: continue for serv, port in info_dict.items(): if ins_i not in merged: merged[ins_i] = {g: {} for g in self.all} merged[ins_i][group][serv] = port for ins_i, info_dict in merged.items(): show_ins = show_unused run_info = [[''], [ins_i]] procs_dict = procs.get(ins_i, {}) for group, sm in self.all.items(): info_grp = info_dict.get(group, {}) for serv, port in info_grp.items(): plist = procs_dict.get(serv) if plist: cfg = cfginfo.get((ins_i, serv), '') or sm.get_cfg(ins_i, serv) if sm == self or instances is None: show_ins = True gs = '%s %s' % (group, serv) port = str(port or '') run_info.append(('', gs, port, cfg)) if len(plist) > 1: run_info.append(['', ' WARNING: multiple processes %s' % ', '.join(str(p.pid) for p in plist)]) extra = sm.extra_info(ins_i) if extra and show_ins: run_info.append(['', extra]) if show_ins: rows.extend(run_info) print('') printTable(('inst', 'service', 'port', 'cfg'), rows, print) def do_getports(self, ins): """machine readable list of services with port numbers""" self.get_info() procs = self.get_procs().get(ins, {}) print(' '.join('%s %s' % (k, self.info[ins][k]) for k in procs)) @staticmethod def extra_info(ins): """provide extra info or None""" return None def action(self, action, *args): method = getattr(self, 'do_' + action, None) if not callable(method): raise UsageError('%s is no valid action' % action) try: ins_list = method(*args) except TypeError as e: errtxt = str(e) if 'do_%s(' % action in errtxt and 'argument' in errtxt: raise UsageError(errtxt) raise # treat wildcards: for ins in ins_list or (): print('\n%s %s %s:' % (action, self.group, ins)) try: method(ins, *args[1:]) except Exception as e: print(str(e)) def do_help(self, *args): options = {'ins': ' ', 'optional_ins': '[instance] ', 'remark': '*', 'legend': ''} wildcards = True if self.main_ins: if len(self.info) == 1: for key in options: options[key] = '' options['remark'] = ' ' wildcards = False else: options['legend'] = ' [instance] is empty or one of %s\n' % ', '.join(self.info) else: options['legend'] = ' is one of %s\n' % ', '.join(self.info) if wildcards: options['legend'] += " * wildcards allowed, using '.' to replace 0 or more arbitrary characters in \n" print(self.USAGE % options) def treat_args(self, argdict, unknown=(), extra=()): if unknown: raise UsageError('unknown argument: %s' % (' '.join(unknown))) if extra: return [argdict.get('ins'), argdict.get('service')] + list(extra) args = [argdict.get('ins'), argdict.get('service')] while args and not args[-1]: args.pop() return args