Files
frappy_sinq/devices.py

673 lines
28 KiB
Python

# -*- coding: utf-8 -*-
# *****************************************************************************
# NICOS, the Networked Instrument Control System of the MLZ
# Copyright (c) 2009-2018 by the NICOS contributors (see AUTHORS)
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
"""managing SECoP server and connections
SEC Node with added functionality for starting and stopping frappy servers
connected to a SEC node
"""
import threading
from nicos import config, session
from nicos.core import Override, Param, Moveable, status, POLLER, SIMULATION, DeviceAlias, \
Device, anytype, listof
from nicos.devices.secop.devices import SecNodeDevice, NicosSecopClient
from nicos.core.utils import USER, User, createThread
from nicos.services.daemon.script import RequestError, ScriptRequest
from nicos.utils.comparestrings import compare
from nicos.devices.secop.devices import get_attaching_devices
from nicos.commands.basic import AddSetup, CreateAllDevices, CreateDevice
from servicemanager import FrappyManager, SeaManager
SERVICES = FrappyManager.services
def suggest(poi, allowed_keys):
comp = {}
for key in allowed_keys:
comp[key] = compare(poi, key)
comp = sorted(comp.items(), key=lambda t: t[1], reverse=True)
return [m[0] for m in comp[:3] if m[1] > 2]
def applyAliasConfig():
"""Apply the desired aliases from session.alias_config.
be more quiet than original
"""
# reimplemented from Session.applyAliasConfig
# apply also when target dev name does not change, as the target device might have
# be exchanged in the mean time
for aliasname, targets in session.alias_config.items():
if aliasname not in session.devices:
continue # silently ignore
aliasdev = session.getDevice(aliasname)
for target, _ in sorted(targets, key=lambda t: -t[1]):
if target in session.devices:
try:
aliasdev.alias = target
except Exception:
session.log.exception("could not set '%s' alias", aliasdev)
break
def cleanup_defunct():
for devname, setupname in list(session.dynamic_devices.items()):
dev = session.devices.get(devname)
if dev and dev._defunct:
devnames = [d.name for d, _ in get_attaching_devices(dev)]
if devnames:
session.log.warning('can not remove device %r due to dependencies on %s'
% (devname, ', '.join(devnames)))
else:
session.destroyDevice(devname)
session.dynamic_devices.pop(devname, None)
def all_info(all_cfg, prefix='currently configured: '):
addkwd = False
info = []
for srv in SERVICES:
cfglist = all_cfg.get(srv)
if cfglist is None:
addkwd = True
else:
if isinstance(cfglist, str):
cfglist = [cfglist]
cfginfo = ','.join(c if isinstance(c, str)
else f"<one of {', '.join(repr(v) for v in c)}>"
for c in cfglist)
if addkwd:
info.append('%s=%r' % (srv, cfginfo))
else:
info.append(repr(cfginfo))
return f"{prefix}frappy({', '.join(info)})"
def get_frappy_config():
try:
return session.devices['frappy']
except KeyError:
session.log.error("the frappy device is not available - 'frappy' setup is not loaded")
return None
class FrappyConfig(Device):
# respect the order: e.g. temperature_regulation must be after temperature
# because it will not be added to envlist when temperature is the same device
parameters = {
'temperature': Param(
'config for sample temperature', type=anytype, default={}),
'temperature_regulation': Param(
'config for temperature regulation', type=anytype, default={}),
'magneticfield': Param(
'config for magnetic field', type=anytype, default={}),
'pressure': Param(
'config for pressure', type=anytype, default={}),
'rotation_z': Param(
'config for sample rotation (to be used as a3)',
type=anytype, default={}),
'stick_rotation': Param(
'config for stick rotation (not necessarily to be used as a3)',
type=anytype, default={}),
'nodes': Param(
'list of names of potential SEC nodes',
type=listof(str), default=[]),
}
meanings = list(parameters)
meanings.remove('nodes')
_trigger_change = None
_previous_shown = None
_previous_state = None
_initial_config = None
_servers_loaded = False
def doInit(self, mode):
if mode != SIMULATION and session.sessiontype != POLLER:
self._trigger_change = threading.Event()
for name in self.nodes:
secnode = session.devices.get(name)
if secnode:
secnode.uri = ''
createThread('frappy change notification', self.handle_notifications)
def handle_notifications(self):
controller = session.daemon_device._controller
while True:
# we do not wait for ever here, because there might be changes
# on an unconnected service
self._trigger_change.wait(15)
self._trigger_change.clear()
while self._trigger_change.wait(2): # triggered again within 2 sec
self._trigger_change.clear()
try:
cfgs = self.check_services()
changes, state, remarks = self.to_consider(cfgs)
if state != self._previous_state and changes:
self._previous_state = state
cmd = 'frappy.has_changed() # inserted automatically when frappy or sea servers changed'
controller.new_request(ScriptRequest(cmd, None, User('guest', USER)))
except RequestError as e:
session.log.error(f'can not queue request {e!r}')
def to_consider(self, cfgs):
"""return info from sea and frappy servers
for a potential "please consider calling frappy(...)" message
"""
error, proposed, state, remarks = FrappyManager().get_server_state(config.instrument, cfgs)
changes = dict(proposed)
for service, guess in proposed.items():
if guess is True:
changes.pop(service)
disconnected = set()
for service, info in cfgs.items():
if info == '<disconnected>':
disconnected.add(service)
if not changes.get(service):
changes[service] = ''
return changes, (proposed,) + state, remarks
def check_services(self):
cfgs = {}
for secnodename in self.nodes:
secnode = session.devices.get(secnodename)
if secnode:
cfgs[secnode.service] = secnode.get_info()
return cfgs
def start_services(self, main=None, stick=None, addons=None):
"""start/stop frappy servers
for example: start_services(main='xy', stick='')
- restart main server with cfg='xy'
- stop stick server
- do not touch addons server
in addition, if a newly given cfg is already used on a running server,
this cfg is removed from the server (remark: cfg might be a comma separated list)
"""
services = {'main': main, 'stick': stick, 'addons': addons}
for service, cfg in services.items():
if cfg == '':
seaconn = session.devices.get(f'se_sea_{service}')
if seaconn and seaconn._attached_secnode:
try:
seaconn.communicate('frappy_remove %s' % service)
except Exception:
pass
used_cfg = {}
all_cfg = {}
new_cfg = {}
secnodes = {}
remove_cfg = []
for service, cfginfo in services.items():
secnodes[service] = secnode = session.devices.get('se_' + service)
chkinfo = ''
if secnode:
all_cfg[service] = chkinfo = secnode.get_info()
if cfginfo is not None:
new_cfg[service] = chkinfo = cfginfo
# check cfg is not used twice
for cfg in chkinfo.split(','):
cfg = cfg.strip()
if cfg and cfg != 'restart':
prev = used_cfg.get(cfg)
if prev:
raise ValueError('%r can not be used in both %s and %s' % (cfg, prev, service))
used_cfg[cfg] = service
for service, cfginfo in reversed(list(new_cfg.items())):
if cfginfo != all_cfg.get(service, ''):
secnode = secnodes[service]
if secnode:
secnode('') # stop previous frappy server
if new_cfg:
for service, cfginfo in new_cfg.items():
nodename = 'se_' + service
secnode = secnodes[service]
prev = all_cfg.get(service)
if cfginfo != prev:
if cfginfo == 'restart':
cfginfo = prev
if not cfginfo:
continue
if not secnode:
if not cfginfo:
continue
AddSetup('frappy_' + service)
secnode = session.devices[nodename]
secnode(cfginfo)
all_cfg[service] = secnode.get_info()
CreateDevice(nodename)
cleanup_defunct()
CreateAllDevices()
self.set_envlist()
for secnode in remove_cfg:
secnode.disable()
self._cache.put(self, 'config', all_cfg)
return all_cfg
def __call__(self, *args, main=None, stick=None, addons=None, force=False):
"""(re)start frappy server(s) with given configs and load setup if needed
- without argument: list running frappy servers, restart failed frappy servers
- frappy('<cfg>'): if available, the standard stick is added too
- frappy(''): the stick is removed too
- addons are not changed when not given
- frappy(main='<cfg>') # main cfg is changed, but stick is kept
- frappy('restart') # restart all frappy servers
- frappy(stick='restart') # restart stick frappy server
"""
stickarg = stick
confirmed = SeaManager().get_cfg(config.instrument, 'sea', True).split('/', 1)[0]
if args:
if main is not None:
raise TypeError('got multiple values for main')
main = args[0]
if len(args) == 1: # special case: main given as single argument
if main == 'restart':
stick = 'restart'
addons = 'restart'
elif stick is None: # auto stick
if main == '':
stick = '' # remove stick with main
else:
stickcfg = main + 'stick'
if FrappyManager().is_cfg(config.instrument, 'stick', stickcfg):
# if a default stick is available, start this also
stick = stickcfg
else:
stick = '' # remove stick when main has changed
else:
if stick is not None:
raise TypeError('got multiple values for stick')
stick, *alist = args[1:]
if alist:
if addons is not None:
raise TypeError('got multiple values for addons')
addons = ','.join(alist)
elif main is None and stick is None and addons is None: # bare frappy() command
self.show_config(self.check_services(), True)
return
if confirmed and confirmed != main and main not in (None, 'restart') and not force:
session.log.warning('%r is plugged to the cryostat control rack', confirmed)
cmd = all_info({'main': main, 'stick': stickarg, 'addons': addons}, '')[:-1] + ', force=True)'
session.log.warning(f'if you are sure, use: %s', cmd)
raise TypeError('refuse to override plugged device')
self.show_config(self.start_services(main, stick, addons))
def show_config(self, allcfg, show_server_state=False):
changes, state, remarks = self.to_consider(allcfg)
if show_server_state == 'auto':
show_server_state = state != self._previous_shown
if show_server_state:
proposed, frappycfgs, seacfgs = state
rows = [['server', 'frappy', 'sea', '']]
for key, remark in remarks.items():
rows.append([key if key in ('main', 'stick') else 'addons',
frappycfgs.get(key, ''), seacfgs.get(key, ''), remark])
wid = [max(len(v) for v in column) for column in zip(*rows)]
# insert title underlines
rows.insert(1, ['-' * w for w in wid[:-1]] + [''])
for row in rows:
session.log.info('%s', ' '.join(v.ljust(w) for w, v in zip(wid, row)))
session.log.info('')
# remove 'frappy.has_changed()' commands in script queue
controller = session.daemon_device._controller
controller.block_requests(r['reqid'] for r in controller.get_queue()
if r['script'].startswith('frappy.has_changed()'))
self._previous_state = self._previous_shown = state
session.log.info(all_info(allcfg))
if changes:
info = all_info(changes, 'proposed cfg changes: ')
session.log.info(info)
session.log.warning('please consider to call: frappy.update() for doing above changes')
if '?' in info:
session.log.warning("but create cfg files first for items marked with '?'")
def update(self):
changes = self.to_consider(self.check_services())[0]
self.show_config(self.start_services(**changes))
def initial_restart_cfg(self, service):
"""get cfg for (re)start of the service
returns:
cfg, when the server has to (re)started with a new cfg
True, when the server is running and does not need a restart
None, when the server is not running, but does not need a restart
"""
if self._servers_loaded:
return None
if self._initial_config is None:
# we do this only once for all services
fm = FrappyManager()
running = fm.get_cfg(config.instrument, None)
cache = self._getCache()
cfgs = {}
for serv, secnode in zip(fm.services, self.nodes):
cfg = running.get(serv)
if not cfg and cache:
cfg = cache.get(secnode, 'value')
if cfg:
cfgs[serv] = cfg
running_main = running.get('main')
if running_main and running_main != cfgs.get('main'):
# new main device: clear old stick
running_stick = running.get('stick')
if running_stick:
cfgs['stick'] = running_stick
else:
cfgs.pop('stick', None)
error, proposed, state, remarks = fm.get_server_state(config.instrument, cfgs)
self._initial_config = proposed
if not error:
# do not show server state on startup
self._previous_state = self._previous_shown = state
return self._initial_config.get(service)
def has_changed(self, show_server_state='auto'):
self._servers_loaded = True
self.show_config(self.check_services(), show_server_state)
def remove_aliases(self):
for meaning in self.meanings:
info = getattr(self, meaning)
aliasnames = info.get('alias', [])
if isinstance(aliasnames, str):
aliasnames = [aliasnames]
for aliasname in aliasnames:
aliasdev = session.devices.get(aliasname)
if aliasdev:
session.destroyDevice(aliasname)
session.configured_devices.pop(aliasname, None)
session.dynamic_devices.pop(aliasname, None)
def get_se_aliases(self):
result = {}
for meaning in self.meanings:
info = getattr(self, meaning)
aliasnames = info.get('alias', [])
if isinstance(aliasnames, str):
aliasnames = [aliasnames]
for aliasname in aliasnames:
aliasdev = session.devices.get(aliasname)
if isinstance(aliasdev, DeviceAlias):
result[aliasname] = aliasdev
return result
def set_envlist(self):
"""create aliases and envlist for SECoP devices
depending on their meaning
"""
previous_aliases = self.get_se_aliases()
# self.remove_aliases()
nodedevs = filter(None, [session.devices.get(devname) for devname in self.nodes])
sample_devices = {}
for nodedev in nodedevs:
secnode = nodedev._secnode
if not secnode:
continue
for devname, (_, desc) in nodedev.setup_info.items():
secop_module = desc['secop_module']
try:
meaning = secnode.modules[secop_module]['properties'].get('meaning')
except KeyError:
meaning = None
if meaning:
meaning_name, importance = meaning
sample_devices.setdefault(meaning_name, []).append((importance, devname))
newenv = {} # to be added to envlist (dict [devname] of aliasname)
to_remove = set() # items to be removed from previous envlist, if present
for meaning in self.meanings:
info = getattr(self, meaning)
aliasnames = info.get('alias')
if aliasnames is None:
aliasnames = []
elif isinstance(aliasnames, str):
aliasnames = [aliasnames]
aliascfg = info.get('targets', {})
predefined_alias = info.get('predefined_alias')
if predefined_alias:
aliases = [a for a in predefined_alias
if isinstance(session.devices.get(a), DeviceAlias)]
if aliases:
if len(aliases) > 1:
raise TypeError(f'do know to which of {aliases} {meaning} to assign to')
alias_config = session.alias_config.setdefault(aliases[0], [])
alias_config.extend(list(aliascfg.items()))
elif not aliasnames:
session.log.warn("neither 'predefined_alias' nor 'alias' configured. skip %s", meaning)
continue
importance_list = sample_devices.get(meaning, [])
importance_list.extend([(nr, nam) for nam, nr in aliascfg.items() if nam in session.devices])
importance_list = sorted(importance_list, reverse=True)
session.log.debug('%s: %r', meaning, importance_list)
for _, devname, in importance_list:
dev = session.devices.get(devname)
if dev is None or info.get('drivable_only', False) and not isinstance(dev, Moveable):
continue
for aliasname in aliasnames:
devcfg = ('nicos.core.DeviceAlias', {})
session.configured_devices[aliasname] = devcfg
session.dynamic_devices[aliasname] = 'frappy' # assign to frappy setup
aliasdev = previous_aliases.pop(aliasname, None)
if aliasdev:
if aliasdev.alias != devname:
session.log.debug('change alias %r -> %r', aliasname, devname)
else:
session.log.debug('create alias %r -> %r', aliasname, devname)
aliasdev = session.createDevice(aliasname, recreate=True, explicit=True)
aliasdev.alias = devname
if aliasnames:
# only the first item of aliasnames is added to the envlist
aliasname = aliasnames[0]
to_remove.add(devname)
to_remove.add(aliasname)
if devname not in newenv and info.get('envlist', True):
# example: when 'temperature' and 'temperature_regulation' are the
# same device, the first one is kept
newenv[devname] = aliasname
break
else:
to_remove.update(aliasnames)
for aliasname in previous_aliases:
session.destroyDevice(aliasname)
session.configured_devices.pop(aliasname, None)
session.dynamic_devices.pop(aliasname, None)
applyAliasConfig() # for other aliases
envlist = [k for k in session.experiment.envlist if k not in to_remove] + list(newenv.values())
if envlist != session.experiment.envlist:
removed = set(session.experiment.envlist).difference(envlist)
session.experiment.setEnvironment(envlist)
if removed:
session.log.info('removed %s from environment', ', '.join(removed))
if newenv:
session.log.info('added %s to environment', ', '.join(newenv.values()))
class FrappyNode(SecNodeDevice, Moveable):
"""SEC node device
with ability to start / restart / stop the frappy server
"""
parameter_overrides = {
'target': Override(description='configuration for the frappy server or host:port',
type=str, default=''),
}
parameters = {
'service': Param('frappy service name (main, stick or addons)', type=str, default=''),
'param_category': Param("category of parameters\n\n"
"set to 'general' if all parameters should appear in the datafile header",
type=str, default='', settable=True),
}
_cfgvalue = None
_lastcfg = None
def doStart(self, value):
if value == 'None':
value = None
self.restart(value)
def doInit(self, mode):
if mode == SIMULATION or session.sessiontype == POLLER:
super().doInit(mode)
else:
fc = session.devices.get('frappy')
if fc:
cfg = fc.initial_restart_cfg(self.service)
if isinstance(cfg, str): # may also be None or True
self.restart(cfg)
if cfg is None: # None means: server is not running, and does not need to be restarted
self._disconnect()
return
try:
self._connect()
except Exception:
pass
def doStop(self):
"""never busy"""
def doRead(self, maxage=0):
return self._cfgvalue or ''
def createDevices(self):
cfg = self.read()
super().createDevices()
if cfg != self._lastcfg:
fc = get_frappy_config()
if fc:
fc._trigger_change.set()
if self.param_category:
for devname, (_, devcfg) in self.setup_info.items():
params_cfg = devcfg['params_cfg']
dev = session.devices[devname]
for pname, pargs in params_cfg.items():
pinfo = dev.parameters[pname]
if not pinfo.category:
pinfo.category = self.param_category
def nodeStateChange(self, online, state):
super().nodeStateChange(online, state)
if online:
if self._cfgvalue is None:
self._cfgvalue = FrappyManager().get_cfg(config.instrument, self.service)
if not self._cfgvalue:
self._cfgvalue = self.uri
else:
self._cfgvalue = None
cfg = self.read()
if self._lastcfg != cfg:
self._lastcfg = cfg
fc = get_frappy_config()
if fc:
fc._trigger_change.set()
def disable(self):
seaconn = session.devices.get('sea_%s' % self.service)
if seaconn and seaconn._attached_secnode:
seaconn.communicate('frappy_remove %s' % self.service)
self._set_status(*self._status)
def _set_status(self, code, text):
if self.uri == '':
code, text = status.DISABLED, 'disabled'
SecNodeDevice._set_status(self, code, text)
def restart(self, cfg=None):
"""restart frappy server
:param cfg: config for frappy server, if not given, restart with the same config
"""
if cfg is None:
cfg = self._cfgvalue
ins = config.instrument
fm = FrappyManager()
info = fm.get_ins_info(ins)
running_cfg = fm.get_cfg(ins, self.service) or ''
if cfg is None:
self.log.error('can not restart - previous cfg unknown')
return
if cfg != running_cfg:
self.disable()
if running_cfg:
self._disconnect()
fm.do_stop(ins, self.service)
is_cfg = cfg and ':' not in cfg
if is_cfg:
available_cfg = None
for cfgitem in cfg.split(','):
if not fm.is_cfg(config.instrument, self.service, cfgitem):
if available_cfg is None:
available_cfg = fm.all_cfg(config.instrument, self.service)
suggestions = suggest(cfgitem, available_cfg)
if suggestions:
session.log.error('%s unknown, did you mean: %s' % (cfgitem, ', '.join(suggestions)))
if available_cfg is not None:
raise ValueError('use "frappy_list()" to get a list of valid frappy configurations')
uri = 'localhost:%d' % info[self.service]
else:
uri = cfg
if uri != self.uri:
self.uri = '' # disconnect
if uri:
if is_cfg:
fm.do_start(ins, self.service, cfg, logger=self.log)
self.uri = uri # connect
self._cfgvalue = cfg
if self._cache:
self._cache.put(self, 'value', cfg)
self._setROParam('target', cfg)
def _disconnect(self):
super()._disconnect()
self._setROParam('target', '')
def get_info(self):
result = self.doRead() or ''
code, text = self.status()
if not result and self.target:
return '<disconnected>'
if code == status.OK or result == '':
return result
if (code, text) == (status.ERROR, 'reconnecting'):
return '%s (frappy not running)' % result
return '%s (%s)' % (result, text)
def doFinish(self):
return False # avoid warning in finish() when target does not match