Files
linsetools/frappy.py
T
2026-04-23 15:59:30 +02:00

292 lines
11 KiB
Python

import re
from pathlib import Path
from .base import MarcheControl, Logger, ArgError, write_content
WRAPPER_CFG = """interface = '{port}'
include({cfg!r})
overrideNode(interface=interface)
"""
WRAPPER_PAT = re.compile(r"interface\s=\s*'(\d*)'\s*\n")
class Config:
log = None
process_file = None
@classmethod
def get(cls, cfgfile):
if not cls.process_file:
import logging
from frappy.config import process_file
from frappy.lib import generalConfig
generalConfig.init()
cls.log = logging.getLogger('linsetools')
cls.process_file = process_file
return cls.process_file(Path(cfgfile), cls.log)
class FrappyControl(MarcheControl):
services = {k: 'service' for k in ('main', 'stick', 'addons')}
argmap = dict(MarcheControl.argmap, all='service', **services)
otherarg = 'cfg'
def __init__(self, instance, host='localhost', port=None, user=None, config=None):
super().__init__(instance, host, port, user, config)
superconfig = self.config.sections['superfrappy']
self.instance = instance
self.wrapperdir = superconfig.pop('wrapperdir')
self.cfgdirs = superconfig.pop('cfgdirs')
frappy_ports = self.ins_config.get('frappy_ports')
ports = frappy_ports.split('-')
self.frappy_ports = list(range(int(ports[0]), int(ports[-1])))
self.frappy_servers = [f'{host}:{p}' for p in self.frappy_ports]
if not Path(self.wrapperdir).is_dir():
raise ValueError(f'{self.wrapperdir} does not exist')
self.get_cfg_info() # do we need to update this from time to time?
def get_service(self, instance):
return f'frappy.{instance}' if self.instance == 'this' else f'frappy.{self.instrument}-{instance}'
def wrapper_file(self, cfg):
return Path(self.wrapperdir) / f'{cfg}_cfg.py'
def cfg_file(self, cfgdirs, service, cfg):
if '/' in cfg:
return Path(cfg)
cfgpy = f'{cfg}_cfg.py'
tries = []
services = [service] if service else list(self.services)
services.append('')
cfgdirs = cfgdirs or self.cfgdirs
for servicedir in services:
for cfgdir in cfgdirs.split(':'):
cfgfile = Path(cfgdir) / servicedir / cfgpy
tries.append(cfgfile)
if cfgfile.is_file():
return servicedir, cfgfile
else:
raise FileNotFoundError(f'can not find {cfgpy} in {tries}')
def get_std_port(self, service):
if service == 'main':
return self.frappy_ports[0]
if service == 'stick':
return self.frappy_ports[1]
return self.frappy_ports[2:]
def get_local_ports(self, run_state=None):
"""return dict <port> of (on, busy, cfg)"""
self.get_cfg_info()
run_state = run_state or self.status('frappy')
result = {}
for host_port, cfg in self.cfg_info.items():
host, port = host_port.split(':')
if host == 'localhost':
on, busy, _ = run_state.get(cfg, (0,0,0))
if on or busy:
result.setdefault(int(port), []).append((on, busy, cfg))
return {sorted(v)[-1][-1]: k for k, v in result.items()}
def get_port(self, service):
if service not in {'main', 'stick', 'addons', 'addon'}:
raise ArgError('illegal service argument')
ports = self.get_std_port(service)
if isinstance(ports, int):
return ports
self.get_cfg_info()
used_ports = self.get_local_ports()
for port in ports:
if port not in used_ports:
return port
raise ValueError('too many frappy servers')
def add_frappy_service(self, service, cfg, port, log=None):
if log is None:
log = Logger('info')
log.info('add %r port=%r', cfg, port)
service, cfgfile = self.cfg_file(self.cfgdirs, service, cfg)
if not port:
if not service:
raise ArgError('service is not given and can not be determined from cfg file location')
port = self.get_port(service)
wrapper_content = WRAPPER_CFG.format(cfg=str(cfgfile), port=port)
cfgname = cfgfile.stem.removesuffix('_cfg')
write_content(self.wrapper_file(cfgname), wrapper_content, group='+rw')
self.get_cfg_info()
log.info('wrapper %r %r', self.wrapper_file(cfgname), wrapper_content)
self.reload()
log.info('registered %r', cfgname)
return f'localhost:{port}'
def get_cfg_info(self):
"""get info from wrapper dir"""
result = {}
for cfgfile in Path(self.wrapperdir).glob('*_cfg.py'):
cfg = cfgfile.stem[:-4]
match = WRAPPER_PAT.match(cfgfile.read_text())
if match:
result[f'localhost:{match.group(1)}'] = cfg
self.cfg_info = result
def delete_frappy_service(self, cfg):
try:
self.wrapper_file(cfg).unlink()
self.reload()
except FileNotFoundError:
pass
def running(self):
"""return a dict <name> of (kind, uri) of running servers"""
run_state = self.status('frappy')
ports = self.get_local_ports(run_state)
result = {}
for cfg, port in ports.items():
on, busy, _ = run_state.pop(cfg)
if on:
port = ports.get(cfg)
kind = (self.frappy_ports + [port]).index(port)
result[cfg] = kind, f'localhost:{port}'
return result
def cli(self):
self.get_cfg_info()
from frappy.client.interactive import init, interact
# from frappy.protocol.discovery import scan
# if ins == self.single_ins:
# all_nodes = {}
# for node in nodes:
# host, port = node.split(':')
# if host == 'localhost':
# host = gethostname()
# all_nodes[gethostbyname(host), int(port)] = node
# for a in scan():
# all_nodes.setdefault((a.address, a.port), f'{a.hostname}:{a.port}')
# nodes = list(all_nodes.values())
init(*self.cfg_info)
interact(appname=self.instrument)
def gui(self):
self.get_cfg_info()
nodes = list(self.cfg_info)
import logging
from frappy.gui.qt import QApplication
from frappy.gui.mainwindow import MainWindow
app = QApplication([])
args = type('args', (), dict(detailed=True, node=nodes))
win = MainWindow(args, logging.getLogger('gui'))
win.show()
return app.exec_()
@staticmethod
def get_cfg_details(cfgfile):
mods = Config.get(cfgfile)
node = mods.pop('node') or {}
sea_cfg = None
for mod, config in mods.items():
cls = config['cls']
cls = getattr(cls, '__name__', cls)
if cls.endswith('SeaClient'):
try:
sea_cfg = config['config']['value']
except KeyError:
sea_cfg = None
return node.get('description', '').strip(), sea_cfg
def all_cfg(self, ins, service, details=False):
"""get available cfg files
:param ins: instance
:param service: service nor None for all services
:param details: get details about relation to sea
:return: set of available cfgs
implicit results:
self.frappy2sea: a dict <frappycfg> of <seacfg> (including extension .config/.stick/.addon)
self.sea2frappy: a dict <seacfg> of set of <frappycfg>
self.list_info: dict <cfgdir> of <cfg> of <description>
"""
all_cfg = set()
if not ins:
return {}
if details:
self.frappy2sea = f2s = {}
self.sea2frappy = s2f = {}
self.list_info = list_info = {}
for servicedir in [service] if service else self.services:
for cfgdir in self.cfgdirs.split(':'):
cfgdir = Path(cfgdir) / servicedir
for cfgfile in cfgdir.glob('*_cfg.py'):
cfg = cfgfile.name[:-7]
if details:
try:
desc, sea_cfg = self.get_cfg_details(cfgfile)
except TypeError:
raise
except Exception as e:
sea_cfg = None
desc = repr(e)
if cfg not in all_cfg:
if sea_cfg:
# sea_info.setdefault(sea_cfg, set()).add(cfg)
f2s[cfg] = sea_cfg
s2f.setdefault(sea_cfg, set()).add(cfg)
list_info.setdefault(cfgdir, {})[cfg] = desc.split('\n', 1)[0]
all_cfg.add(cfg)
# if service == 'main':
# sea_info['none.config'] = {''}
return all_cfg
def listcfg(self, service='', prt=print):
self.all_cfg(self.instrument, service, True)
seacfgpat = re.compile(r'(.*)(\.config|\.stick|\.addon)')
keylen = max((max(len(k) for k in cfgs) for cfgs in self.list_info.values()), default=1)
ambiguous = set()
for cfgdir, cfgs in self.list_info.items():
if cfgs:
prt('')
prt('--- %s:' % cfgdir)
for cfg, desc in sorted(cfgs.items(), key=lambda v: (v[0].lower(), v)):
seacfg = self.frappy2sea.get(cfg)
if seacfg:
name, ext = seacfgpat.match(seacfg).groups()
if name == cfg or name + 'stick' == cfg:
prefix = '* '
else:
prefix = f'* ({name}{ext}) '
if len(self.sea2frappy[seacfg]) > 1:
prefix = '!' + prefix[1:]
ambiguous.add(seacfg)
desc = prefix + desc
prt('%s %s' % (cfg.ljust(keylen), desc))
prt(' ')
gap = ' ' * keylen
prt(f'{gap} * need sea')
if ambiguous:
prt(f'{gap} ! {len(ambiguous)} ambiguous mappings sea -> frappy')
def do(self, action=None, service=None, cfg=None):
if action == 'start':
self.add_frappy_service(service, cfg, None)
self.start(cfg)
elif action == 'restart':
self.restart(cfg)
elif action == 'stop':
self.stop(cfg)
elif action == 'list':
self.list(service or 'frappy')
elif action == 'listcfg':
self.listcfg(service)
elif action == 'gui':
self.gui()
elif action in ('cli', None):
self.cli()
else:
raise ArgError(f'unknown action {action!r}')