Files
servicemanager/nicosman.py
T
2026-02-17 14:31:24 +01:00

299 lines
11 KiB
Python

# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
import os
import sys
import shutil
from pathlib import Path
from glob import glob
from os.path import join, abspath, dirname, basename, expanduser, exists, islink
from configparser import ConfigParser
from servicemanager.base import ServiceManager, UsageError
ENV_KEYS = {
'NICOS_CACHE_PORT',
'NICOS_DAEMON_PORT',
'FRAPPY_MAIN_PORT',
'FRAPPY_STICK_PORT',
'FRAPPY_ADDONS_PORT',
'SEA_PORT',
'PYTHONPATH',
}
def copy_all(srcdir, dstdir):
"""copy all files from srcdir to dstdir"""
files = glob(join(srcdir, '*'))
for src in files:
shutil.copy2(src, dstdir)
return files
class NicosManager(ServiceManager):
group = 'nicos'
services = ('cache', 'daemon', 'poller')
USAGE = """
Usage:
nicos gui <instance>
nicos <instance> (the same as above)
nicos list [<instance>]
nicos start <instance> [<service>]
nicos restart <instance> [<service>] %(remark)s
nicos stop <instance> [<service>] %(remark)s
nicos create <instance> <nr>
nicos link <instance> (create links to nicos data and scripts)
<service> is one of cache, deamon, poller
%(legend)s
to be done after the experiment:
nicos copy (copy data and scripts from link)
nicos copy [<instance> [<year>/<proposal>]] (copy specific data)
"""
def get_info(self):
super().get_info()
for ins, env in self.env.items():
root = env.get('NICOS_ROOT') or os.environ['NICOS_ROOT']
if 'NICOS_PACKAGE' not in env:
instdir = list(glob(join(root, '*', ins)))
if not instdir:
print('%s not found in any package' % ins)
elif len(instdir) > 1:
print('ambiguos package: %s' % ', '.join(instdir))
else:
env['NICOS_PACKAGE'] = basename(dirname(instdir[0]))
return self.info
def do_create(self, ins, *args):
if ins == 'check':
ins_list = self.wildcard(None)
print(ins_list)
else:
ins_list = self.wildcard(ins)
if ins_list is None:
ins_list = [ins]
self.get_info()
for ins_i in ins_list:
env = self.env[ins_i]
base = Path(env['NICOS_ROOT']) / env['NICOS_PACKAGE'] / ins_i
nicos_conf = base / 'nicos.conf'
content = {
'nicos': {
'setup_subdirs': '["%s", "linse_nicos", "frappy_sinq"]' % ins_i,
'logging_path': '"%s/%s"' % (env['NICOS_LOG'], ins_i),
'pid_path': '"%s/%s"' % (env['NICOS_LOG'], ins_i),
},
# 'environment': {
# key: f'"{env[key]}"' for key in env if key in ENV_KEYS
#}
}
try:
cp = ConfigParser()
cp.optionsxform = str
cp.read(nicos_conf)
if set(cp.sections()) != set(content):
oldcontent = None
else:
oldcontent = {key: cp[key] for key in content}
except FileNotFoundError:
oldcontent = None
if content != oldcontent:
cp = ConfigParser()
cp.optionxform = str
os.makedirs(base, exist_ok=True)
for key, sdict in content.items():
cp[key] = sdict
with open(nicos_conf, 'w') as fd:
cp.write(fd)
shutil.copyfile(Path(__file__).parent / 'cfg' / 'guiconfig.py', base / 'guiconfig.py')
@staticmethod
def extra_info(ins):
try:
datadir = join(os.environ.get('NICOS_DATA', '.'), ins)
datadir = join(datadir, os.readlink(join(datadir, 'current')))
except FileNotFoundError:
return None
return 'nicos data: %s' % datadir
@staticmethod
def linked_dir(current_wd):
link = join(current_wd, 'data')
if islink(link):
return join(link, os.readlink(link))
if exists(link):
raise ValueError('%s is not a symlink' % link)
return None
@staticmethod
def handle_linked(target, wd=None):
if not target:
print('no links from %s' % os.getcwd())
return
targetdir = abspath(join(target, '..'))
analist_file = join(targetdir, 'data_links.txt')
linked_from = []
if exists(analist_file):
with open(analist_file) as f:
for line in f:
line = line.strip()
if line and line != wd:
if islink(join(line, 'data')):
linked_from.append(line)
if wd:
linked_from.append(wd)
linked_from = '\n'.join(linked_from)
if wd:
with open(analist_file, 'w') as f:
f.write('%s\n' % linked_from)
print('links to %s from:\n%s' % (target, linked_from))
@staticmethod
def make_symlink(ins):
data = join(os.environ['NICOS_DATA'], ins)
target = join(data, os.readlink(join(data, 'current')), 'data')
link = join(os.getcwd(), 'data')
if islink(link):
os.remove(link)
elif exists(link):
raise ValueError('%s is not a symlink' % link)
os.symlink(target, link, True)
NicosManager.handle_linked(target, os.getcwd())
@staticmethod
def copy_linked(datadir=None):
src = NicosManager.linked_dir(os.getcwd())
if src:
if not datadir:
if src.rsplit('/', 1)[-1] != 'data':
raise ValueError('%s is already a copy' % src)
datadir = dirname(src)
else:
if not datadir:
raise ValueError('missing data dir')
src = join(datadir, 'data')
dst = join(os.getcwd(), '%s_%s_data' % tuple(datadir.rsplit('/', 2)[-2:]))
os.makedirs(dst, exist_ok=True)
n = len(copy_all(src, dst))
print('copy %d files to %s' % (n, dst))
# copy scripts
src = join(datadir, 'scripts')
dst = join(dst, 'scripts')
os.makedirs(dst, exist_ok=True)
n = len(copy_all(src, dst))
print('copy %d files to %s' % (n, dst))
@staticmethod
def do_link(ins, *args):
"""make/show symlink to nicos data in current wd"""
if ins is None:
NicosManager.handle_linked(NicosManager.linked_dir(os.getcwd()))
else:
NicosManager.make_symlink(ins)
sys.exit(0)
@staticmethod
def do_copy(ins, year_prop, *args):
"""copy nicos data to current wd"""
if ins is None:
NicosManager.copy_linked()
else:
data = join(os.environ['NICOS_DATA'], ins)
if year_prop:
year, sep, proposal = year_prop.rpartition('/')
if year:
src = join(data, year, proposal)
else:
# get proposal from most recent year
src = next(reversed(sorted(glob(join(data, '*', proposal)))))
else:
src = join(data, os.readlink(join(data, 'current')))
NicosManager.copy_linked(src)
def prepare_start(self, ins, service, *args):
start_dir, env = super().prepare_start(ins, service)
instr = '%s.%s' % (env['NICOS_PACKAGE'], ins)
env['INSTRUMENT'] = instr
start_dir = env.get('NICOS_START', start_dir)
return start_dir, env
def prepare_client(self, ins):
if self.wildcard(ins):
raise UsageError('wildcards not allowed')
self.check_running(ins, 'daemon')
env = self.prepare_start(ins, 'daemon')[1]
os.environ.update(env)
os.chdir(join(os.environ['NICOS_ROOT'], env['NICOS_PACKAGE']))
sys.path.insert(0, os.environ['NICOS_ROOT'])
def run_client(self, ins, main, clientapp, **kwargs):
serverhost = os.environ.get('REMOTE_HOST') or 'localhost'
login = os.environ.get('REMOTE_LOGIN') or 'guest:guest'
sys.argv[:] = [clientapp, '%s@%s:%d' % (login, serverhost, self.info[ins]['daemon'])]
sys.exit(main(sys.argv, **kwargs))
def do_cli(self, ins):
self.prepare_client(ins)
from nicos.clients.cli import main
os.environ['NICOS_HISTORY_FILE'] = expanduser('~/.nicoshistory_%s' % ins)
self.run_client(ins, main, 'nicos-client')
def do_gui(self, ins):
self.prepare_client(ins)
# patch QApplication to use 'nicos_<instr>' instead of 'nicos' as organizationName
# for different settings and log files
import nicos.guisupport.qt
class QApplication(nicos.guisupport.qt.QApplication):
def __init__(self, *args, organizationName=None, **kwds):
super().__init__(*args, organizationName=f'nicos_{ins}', **kwds)
nicos.guisupport.qt.QApplication = QApplication
from nicos.clients.gui.main import main
print('starting nicos gui %s' % ins)
try:
self.run_client(ins, main, 'nicos-gui', postfix='_'+ins)
return
except TypeError:
pass
# treat legacy cases
try:
userpath = expanduser('~/.config/nicos_%s' % ins)
self.run_client(ins, main, 'nicos-gui', userpath=userpath)
return
except TypeError:
pass
self.run_client(ins, main, 'nicos-gui')
def treat_args(self, argdict, unknown=(), extra=()):
if argdict['action'] == 'create' and len(unknown) == 1:
return super().treat_args(argdict, (), unknown)
return super().treat_args(argdict, unknown, extra)