[wip] auto sample environment change, basic version

This commit is contained in:
2024-04-30 14:41:56 +02:00
parent 09237e4118
commit b10102e052
2 changed files with 298 additions and 213 deletions

View File

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
# NICOS, the Networked Instrument Control System of the MLZ
# Copyright (c) 2009-2018 by the NICOS contributors (see AUTHORS)
@ -30,14 +29,14 @@ connected to a SEC node
import threading
from nicos import config, session
from nicos.core import Override, Param, Moveable, status, POLLER, SIMULATION, DeviceAlias, \
Device, anytype, listof
from nicos.devices.secop.devices import SecNodeDevice, NicosSecopClient
from nicos.core.utils import USER, User, createThread
from nicos.services.daemon.script import RequestError, ScriptRequest
Device, anytype, listof, MASTER
from nicos.devices.secop.devices import SecNodeDevice
from nicos.core.utils import createThread
from nicos.utils.comparestrings import compare
from nicos.devices.secop.devices import get_attaching_devices
from nicos.commands.basic import AddSetup, CreateAllDevices, CreateDevice
from servicemanager import FrappyManager, SeaManager
from nicos.utils import loggers
from servicemanager import FrappyManager, SeaManager, Reconnect, Keep
SERVICES = FrappyManager.services
@ -57,7 +56,7 @@ def applyAliasConfig():
"""
# reimplemented from Session.applyAliasConfig
# apply also when target dev name does not change, as the target device might have
# be exchanged in the mean time
# be exchanged in the meantime
for aliasname, targets in session.alias_config.items():
if aliasname not in session.devices:
continue # silently ignore
@ -92,6 +91,8 @@ def all_info(all_cfg, prefix='currently configured: '):
if cfglist is None:
addkwd = True
else:
# if cfglist is True:
# cfglist = ['reconnect']
if isinstance(cfglist, str):
cfglist = [cfglist]
cfginfo = ','.join(c if isinstance(c, str)
@ -108,7 +109,7 @@ def get_frappy_config():
try:
return session.devices['frappy']
except KeyError:
session.log.error("the frappy device is not available - 'frappy' setup is not loaded")
session.log.exception("the frappy device is not available - 'frappy' setup is not loaded")
return None
@ -135,71 +136,124 @@ class FrappyConfig(Device):
type=listof(str), default=[]),
}
meanings = list(parameters)
meanings.remove('nodes')
_trigger_change = None
meanings = [n for n, p in parameters.items() if p.type is anytype]
_update_setup = 'on_frappy'
_previous_shown = None
_previous_state = None
_initial_config = None
_servers_loaded = False
_initial_info = None
_shutdown_event = None
_restarting = False
_within_update_setup = False
def doInit(self, mode):
if mode != SIMULATION and session.sessiontype != POLLER:
self._trigger_change = threading.Event()
self._shutdown_event = threading.Event()
for name in self.nodes:
secnode = session.devices.get(name)
if secnode:
secnode.uri = ''
createThread('frappy change notification', self.handle_notifications)
createThread('check frappy and sea servers', self.detect_changes)
def handle_notifications(self):
controller = session.daemon_device._controller
while True:
# we do not wait for ever here, because there might be changes
# on an unconnected service
self._trigger_change.wait(15)
self._trigger_change.clear()
while self._trigger_change.wait(2): # triggered again within 2 sec
self._trigger_change.clear()
try:
cfgs = self.check_services()
changes, state, remarks = self.to_consider(cfgs)
if state != self._previous_state and changes:
self._previous_state = state
cmd = 'frappy.has_changed() # inserted automatically when frappy or sea servers changed'
controller.new_request(ScriptRequest(cmd, None, User('guest', USER)))
except RequestError as e:
session.log.error(f'can not queue request {e!r}')
def doShutdown(self):
self._shutdown_event.set()
def to_consider(self, cfgs):
"""return info from sea and frappy servers
for a potential "please consider calling frappy(...)" message
def detect_changes(self):
before_check = before_change = prev_shown = None
cnt = 0
while not self._shutdown_event.wait(1):
busy = bool(session.experiment.scripts or session.daemon_device._controller.queue.scripts)
if busy and cnt < 10:
# check only every 10 sec when busy
cnt += 1
continue
cnt = 0
need_change, changes, fm = self.to_consider()
if fm.state == before_change or not need_change:
continue
if fm.state != before_check:
# must be twice the same
before_check = fm.state
continue
if busy or fm.state == prev_shown:
continue
self.show_changes(changes)
prev_shown = fm.state
def show_changes(self, changes):
if changes is None:
need_change, changes, fm = self.to_consider()
session.log.warning('sample environment has changed:')
args = []
for service, cfg in changes.items():
arg = str(cfg)
if cfg:
if isinstance(cfg, Keep):
arg = f'={cfg}'
session.log.info('keep %s: %s', service, cfg)
else:
session.log.warning('change %s to %s', service, cfg)
elif cfg == '':
session.log.warning('remove %s', service)
args.append(arg)
args = tuple(args)
session.log.warning('use frappy%r or frappy.update() to activate', args)
def to_consider(self, cfgs=None):
"""return info about a proposed changes
:param cfgs: previous configuration
:return: <'need change' flag>, <change dict>, <frappy manager instance>
the information takes into account running frappy and sea servers
and their configuration
"""
error, proposed, state, remarks = FrappyManager().get_server_state(config.instrument, cfgs)
current_cfgs, target_cfgs = self.check_services()
if cfgs is None:
cfgs = current_cfgs
self._current_cfgs = cfgs
fm = FrappyManager()
proposed = fm.get_server_state(config.instrument, cfgs)
changes = dict(proposed)
for service, guess in proposed.items():
if guess is True:
changes.pop(service)
disconnected = set()
for service, info in cfgs.items():
if info == '<disconnected>':
disconnected.add(service)
for service, cfg in proposed.items():
if cfg == 'reconnect': # means: server is running, no restart needed
if service in current_cfgs:
changes.pop(service)
need_change = False
for service in SERVICES:
cfg = changes.get(service) # proposed cfg
info = cfgs.get(service) # running cfg
if not info:
if cfg == '':
changes.pop(service)
cfg = None
if target_cfgs.get(service):
need_change = True
elif info == '<disconnected>':
if not changes.get(service):
changes[service] = ''
return changes, (proposed,) + state, remarks
need_change = True
cfg = ''
if cfg and not isinstance(cfg, Keep):
need_change = True
# elif target_cfgs.get(service) != cfgs.get(service):
# need_change = True
return need_change, changes, fm
def check_services(self):
cfgs = {}
targets = {}
for secnodename in self.nodes:
secnode = session.devices.get(secnodename)
if secnode:
cfgs[secnode.service] = secnode.get_info()
return cfgs
targets[secnode.service] = secnode.target
return cfgs, targets
def start_services(self, main=None, stick=None, addons=None):
def start_services(self, main=None, stick=None, addons=None,):
"""start/stop frappy servers
:param main, stick, addons: cfg for frappy servers, '' to stop, None to keep
for example: start_services(main='xy', stick='')
- restart main server with cfg='xy'
- stop stick server
@ -208,67 +262,71 @@ class FrappyConfig(Device):
in addition, if a newly given cfg is already used on a running server,
this cfg is removed from the server (remark: cfg might be a comma separated list)
"""
services = {'main': main, 'stick': stick, 'addons': addons}
for service, cfg in services.items():
if cfg == '':
seaconn = session.devices.get(f'se_sea_{service}')
if seaconn and seaconn._attached_secnode:
try:
seaconn.communicate('frappy_remove %s' % service)
except Exception:
pass
used_cfg = {}
all_cfg = {}
new_cfg = {}
secnodes = {}
remove_cfg = []
for service, cfginfo in services.items():
secnodes[service] = secnode = session.devices.get('se_' + service)
chkinfo = ''
if secnode:
all_cfg[service] = chkinfo = secnode.get_info()
if cfginfo is not None:
new_cfg[service] = chkinfo = cfginfo
# check cfg is not used twice
for cfg in chkinfo.split(','):
cfg = cfg.strip()
if cfg and cfg != 'restart':
prev = used_cfg.get(cfg)
if prev:
raise ValueError('%r can not be used in both %s and %s' % (cfg, prev, service))
used_cfg[cfg] = service
for service, cfginfo in reversed(list(new_cfg.items())):
if cfginfo != all_cfg.get(service, ''):
secnode = secnodes[service]
self._restarting = True
try:
services = {'main': main, 'stick': stick, 'addons': addons}
to_reconnect = {}
for service, cfg in services.items():
if cfg == '':
seaconn = session.devices.get(f'se_sea_{service}')
if seaconn and seaconn._attached_secnode:
try:
seaconn.communicate('frappy_remove %s' % service)
except Exception:
pass
used_cfg = {}
all_cfg = {}
new_cfg = {}
secnodes = {}
remove_cfg = []
for service, cfginfo in services.items():
secnodes[service] = secnode = session.devices.get('se_' + service)
chkinfo = ''
if secnode:
secnode('') # stop previous frappy server
all_cfg[service] = chkinfo = secnode.get_info()
if cfginfo is not None and (cfginfo != chkinfo or not isinstance(cfginfo, Reconnect)):
new_cfg[service] = chkinfo = cfginfo
if secnode:
to_reconnect[service] = secnode
if new_cfg:
for service, cfginfo in new_cfg.items():
nodename = 'se_' + service
secnode = secnodes[service]
prev = all_cfg.get(service)
if cfginfo != prev:
if cfginfo == 'restart':
cfginfo = prev
if not cfginfo:
continue
# check cfg is not used twice
for cfg in chkinfo.split(','):
cfg = cfg.strip()
if cfg and cfg != 'restart':
prev = used_cfg.get(cfg)
if prev:
raise ValueError('%r can not be used in both %s and %s' % (cfg, prev, service))
used_cfg[cfg] = service
for service, cfginfo in reversed(list(new_cfg.items())):
if cfginfo != all_cfg.get(service, ''):
secnode = secnodes[service]
if secnode:
secnode('') # stop previous frappy server
if new_cfg:
for service, cfginfo in new_cfg.items():
nodename = 'se_' + service
secnode = secnodes[service]
prev = all_cfg.get(service)
if not secnode:
if not cfginfo:
continue
AddSetup('frappy_' + service)
secnode = session.devices[nodename]
secnode(cfginfo)
to_reconnect.pop(service, None)
all_cfg[service] = secnode.get_info()
CreateDevice(nodename)
cleanup_defunct()
CreateAllDevices()
self.set_envlist()
for secnode in remove_cfg:
secnode.disable()
self._cache.put(self, 'config', all_cfg)
cleanup_defunct()
CreateAllDevices()
for secnode in to_reconnect.values():
secnode._secnode.connect()
self.set_envlist()
for secnode in remove_cfg:
secnode.disable()
finally:
self._restarting = False
return all_cfg
def __call__(self, *args, main=None, stick=None, addons=None, force=False):
@ -280,24 +338,29 @@ class FrappyConfig(Device):
- addons are not changed when not given
- frappy(main='<cfg>') # main cfg is changed, but stick is kept
- frappy('restart') # restart all frappy servers
- frappy(stick='restart') # restart stick frappy server
- frappy('reconnect') # reconnect to running frappy servers
"""
stickarg = stick
confirmed = SeaManager().get_cfg(config.instrument, 'sea', True).split('/', 1)[0]
_, changes, fm = to_consider = self.to_consider()
confirmed = fm.sea.get_cfg(config.instrument, 'sea', True).split('/', 1)[0]
if args:
if main is not None:
raise TypeError('got multiple values for main')
main = args[0]
if len(args) == 1: # special case: main given as single argument
if main == 'restart':
stick = 'restart'
addons = 'restart'
main = self._current_cfgs.get('main')
stick = self._current_cfgs.get('stick')
addons = self._current_cfgs.get('addons')
elif main == 'reconnect':
main = None
elif stick is None: # auto stick
if main == '':
stick = '' # remove stick with main
else:
stickcfg = main + 'stick'
if FrappyManager().is_cfg(config.instrument, 'stick', stickcfg):
if fm.is_cfg(config.instrument, 'stick', stickcfg):
# if a default stick is available, start this also
stick = stickcfg
else:
@ -311,7 +374,7 @@ class FrappyConfig(Device):
raise TypeError('got multiple values for addons')
addons = ','.join(alist)
elif main is None and stick is None and addons is None: # bare frappy() command
self.show_config(self.check_services(), True)
self.show_config(None, True, to_consider=to_consider)
return
if confirmed and confirmed != main and main not in (None, 'restart') and not force:
session.log.warning('%r is plugged to the cryostat control rack', confirmed)
@ -320,80 +383,53 @@ class FrappyConfig(Device):
raise TypeError('refuse to override plugged device')
self.show_config(self.start_services(main, stick, addons))
def show_config(self, allcfg, show_server_state=False):
changes, state, remarks = self.to_consider(allcfg)
def show_config(self, allcfg, show_server_state=False, to_consider=None):
need_change, changes, fm = to_consider or self.to_consider(allcfg)
if show_server_state == 'auto':
show_server_state = state != self._previous_shown
show_server_state = fm.state != self._previous_shown and need_change
if show_server_state:
proposed, frappycfgs, seacfgs = state
rows = [['server', 'frappy', 'sea', '']]
for key, remark in remarks.items():
for key, remark in fm.remarks.items():
rows.append([key if key in ('main', 'stick') else 'addons',
frappycfgs.get(key, ''), seacfgs.get(key, ''), remark])
fm.frappy_cfgs.get(key, ''), fm.sea_cfgs.get(key, ''), remark])
wid = [max(len(v) for v in column) for column in zip(*rows)]
# insert title underlines
rows.insert(1, ['-' * w for w in wid[:-1]] + [''])
for row in rows:
session.log.info('%s', ' '.join(v.ljust(w) for w, v in zip(wid, row)))
session.log.info('')
# remove 'frappy.has_changed()' commands in script queue
controller = session.daemon_device._controller
controller.block_requests(r['reqid'] for r in controller.get_queue()
if r['script'].startswith('frappy.has_changed()'))
self._previous_state = self._previous_shown = state
session.log.info(all_info(allcfg))
self._previous_state = self._previous_shown = fm.state
session.log.info(all_info(self._current_cfgs))
if changes:
info = all_info(changes, 'proposed cfg changes: ')
session.log.info(info)
session.log.warning('please consider to call: frappy.update() for doing above changes')
if '?' in info:
session.log.warning("but create cfg files first for items marked with '?'")
if need_change:
self.show_changes(changes)
def update(self):
changes = self.to_consider(self.check_services())[0]
def update(self, *args):
if args:
changes = {k: (Keep(v[1:]) if v.startswith('=') else v) for k, v in zip(SERVICES, args) if v is not None}
else:
changes = self.to_consider()[1]
self.show_config(self.start_services(**changes))
def initial_restart_cfg(self, service):
"""get cfg for (re)start of the service
returns:
cfg, when the server has to (re)started with a new cfg
True, when the server is running and does not need a restart
None, when the server is not running, but does not need a restart
"""
if self._servers_loaded:
return None
if self._initial_config is None:
def get_init_info(self, service):
"""check whether a connect of this service is required"""
if self._initial_info is None:
# we do this only once for all services
fm = FrappyManager()
running = fm.get_cfg(config.instrument, None)
cache = self._getCache()
cfgs = {}
for serv, secnode in zip(fm.services, self.nodes):
cfg = running.get(serv)
if not cfg and cache:
cfg = cache.get(secnode, 'value')
if cfg:
cfgs[serv] = cfg
running_main = running.get('main')
if running_main and running_main != cfgs.get('main'):
# new main device: clear old stick
running_stick = running.get('stick')
if running_stick:
cfgs['stick'] = running_stick
else:
cfgs.pop('stick', None)
error, proposed, state, remarks = fm.get_server_state(config.instrument, cfgs)
self._initial_config = proposed
if not error:
if cache:
cfg = cache.get(secnode, 'previous_config', '')
if cfg:
cfgs[serv] = cfg
self._initial_info = {s: (cfgs.get(s), running.get(s)) for s in fm.services}
fm.get_server_state(config.instrument, cfgs)
if not fm.error:
# do not show server state on startup
self._previous_state = self._previous_shown = state
return self._initial_config.get(service)
def has_changed(self, show_server_state='auto'):
self._servers_loaded = True
self.show_config(self.check_services(), show_server_state)
self._previous_state = self._previous_shown = fm.state
return self._initial_info[service]
def remove_aliases(self):
for meaning in self.meanings:
@ -485,6 +521,9 @@ class FrappyConfig(Device):
session.log.debug('change alias %r -> %r', aliasname, devname)
else:
session.log.debug('create alias %r -> %r', aliasname, devname)
session.cache.put(aliasname, 'visibility', dev.visibility)
session.cache.put(aliasname, 'loglevel', dev.loglevel)
session.cache.put(aliasname, 'description', dev.description)
aliasdev = session.createDevice(aliasname, recreate=True, explicit=True)
aliasdev.alias = devname
if aliasnames:
@ -532,10 +571,13 @@ class FrappyNode(SecNodeDevice, Moveable):
'param_category': Param("category of parameters\n\n"
"set to 'general' if all parameters should appear in the datafile header",
type=str, default='', settable=True),
'quiet_init': Param('flag to set loglevel to error while initializing',
type=bool, default=True, settable=True)
}
_cfgvalue = None
_lastcfg = None
__log_recording = ()
def doStart(self, value):
if value == 'None':
@ -548,16 +590,17 @@ class FrappyNode(SecNodeDevice, Moveable):
else:
fc = session.devices.get('frappy')
if fc:
cfg = fc.initial_restart_cfg(self.service)
if isinstance(cfg, str): # may also be None or True
self.restart(cfg)
if cfg is None: # None means: server is not running, and does not need to be restarted
self._disconnect()
cfg, running = fc.get_init_info(self.service)
self._cfgvalue = running
self._setROParam('target', cfg)
if cfg and (':' not in cfg and cfg != running):
self._set_status(status.ERROR, 'cfg changed')
return
try:
self._connect()
except Exception:
pass
if self.uri:
try:
self._connect()
except Exception:
pass
def doStop(self):
"""never busy"""
@ -568,10 +611,6 @@ class FrappyNode(SecNodeDevice, Moveable):
def createDevices(self):
cfg = self.read()
super().createDevices()
if cfg != self._lastcfg:
fc = get_frappy_config()
if fc:
fc._trigger_change.set()
if self.param_category:
for devname, (_, devcfg) in self.setup_info.items():
params_cfg = devcfg['params_cfg']
@ -581,21 +620,59 @@ class FrappyNode(SecNodeDevice, Moveable):
if not pinfo.category:
pinfo.category = self.param_category
def makeDynamicDevices(self, setup_info):
patched_loggers = []
if self.quiet_init:
for devname, (_, devcfg) in setup_info.items():
log = session.getLogger(devname)
if log not in patched_loggers:
result = [loggers.INFO] # default level
patched_loggers.append((log, result))
log.setLevel(loggers.ERROR)
# avoid level change when the loglevel parameter is treated
# store level instead in result
log.__dict__['setLevel'] = result.append
try:
super().makeDynamicDevices(setup_info)
finally:
for log, result in patched_loggers:
log.__dict__.pop('setLevel', None) # re-enable setLevel
log.setLevel(result[-1]) # set to stored or default value
def showInitLog(self):
for devname, record in self.__log_recording:
session.getLogger(devname).handle(record)
self.__log_recording = ()
def nodeStateChange(self, online, state):
super().nodeStateChange(online, state)
print(f'NODE {self.service} {online} {state}')
# self.log.info('NODE %r %r', online, state)
if online:
super().nodeStateChange(online, state)
if self._cfgvalue is None:
self._cfgvalue = FrappyManager().get_cfg(config.instrument, self.service)
if not self._cfgvalue:
running_cfg = FrappyManager().get_cfg(config.instrument, self.service)
if running_cfg:
if running_cfg != self.target:
self.log.warning(f'server info {running_cfg!r} does not match target cfg {self.target!r}')
self._cfgvalue = running_cfg
else:
self._cfgvalue = self.uri
else:
self._cfgvalue = None
cfg = self.read()
if self._lastcfg != cfg:
self._lastcfg = cfg
fc = get_frappy_config()
if fc:
fc._trigger_change.set()
if self.target == self._cfgvalue:
# SecNodeDevice.nodeStateChange will change status to 'reconnecting'
super().nodeStateChange(online, state)
else:
# do not reconnect
self._cfgvalue = None
def descriptiveDataChange(self, module, description):
running_cfg = FrappyManager().get_cfg(config.instrument, self.service)
if not running_cfg or running_cfg == self.target:
super().descriptiveDataChange(module, description)
else:
self._disconnect(True)
self._cfgvalue = running_cfg
self._set_status(status.ERROR, 'cfg changed')
def disable(self):
seaconn = session.devices.get('sea_%s' % self.service)
@ -615,47 +692,56 @@ class FrappyNode(SecNodeDevice, Moveable):
"""
if cfg is None:
cfg = self._cfgvalue
if cfg is None:
self.log.error('can not restart - previous cfg unknown')
return
ins = config.instrument
fm = FrappyManager()
info = fm.get_ins_info(ins)
running_cfg = fm.get_cfg(ins, self.service) or ''
if cfg is None:
self.log.error('can not restart - previous cfg unknown')
return
if cfg != running_cfg:
self.disable()
if running_cfg:
self._disconnect()
session.log.info('stop frappy_%s %r %r', self.service, running_cfg, cfg)
fm.do_stop(ins, self.service)
is_cfg = cfg and ':' not in cfg
if is_cfg:
available_cfg = None
for cfgitem in cfg.split(','):
if not fm.is_cfg(config.instrument, self.service, cfgitem):
if available_cfg is None:
available_cfg = fm.all_cfg(config.instrument, self.service)
suggestions = suggest(cfgitem, available_cfg)
if suggestions:
session.log.error('%s unknown, did you mean: %s' % (cfgitem, ', '.join(suggestions)))
if available_cfg is not None:
raise ValueError('use "frappy_list()" to get a list of valid frappy configurations')
uri = 'localhost:%d' % info[self.service]
else:
uri = cfg
if uri != self.uri:
self.uri = '' # disconnect
if uri:
try:
is_cfg = cfg and ':' not in cfg
if is_cfg:
fm.do_start(ins, self.service, cfg, logger=self.log)
self.uri = uri # connect
self._cfgvalue = cfg
if self._cache:
self._cache.put(self, 'value', cfg)
self._setROParam('target', cfg)
if cfg == 'reconnect':
is_cfg = False
else:
available_cfg = None
for cfgitem in cfg.split(','):
if not fm.is_cfg(config.instrument, self.service, cfgitem):
if available_cfg is None:
available_cfg = fm.all_cfg(config.instrument, self.service)
suggestions = suggest(cfgitem, available_cfg)
if suggestions:
session.log.error('%s unknown, did you mean: %s' % (cfgitem, ', '.join(suggestions)))
if available_cfg is not None:
raise ValueError('use "frappy_list()" to get a list of valid frappy configurations')
uri = 'localhost:%d' % info[self.service]
else:
uri = cfg
if uri != self.uri:
self.uri = '' # disconnect
if uri:
if is_cfg:
session.log.info('start frappy_%s', self.service)
fm.do_start(ins, self.service, cfg, logger=self.log)
self.uri = uri # connect
self._cfgvalue = cfg
if self._cache:
self._cache.put(self, 'value', cfg)
self._setROParam('target', cfg)
finally:
self._cache.put(self, 'previous_config', self._cfgvalue or self.uri)
def _disconnect(self):
def _disconnect(self, keeptarget=False):
super()._disconnect()
self._setROParam('target', '')
if not keeptarget:
self._setROParam('target', '')
def get_info(self):
result = self.doRead() or ''