Files
2026-04-07 10:33:45 +02:00

585 lines
22 KiB
Python

# *****************************************************************************
# NICOS, the Networked Instrument Control System of the MLZ
# Copyright (c) 2009-2018 by the NICOS contributors (see AUTHORS)
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
"""managing SECoP server and connections
SEC Node with added functionality for starting and stopping frappy servers
connected to a SEC node
"""
import sys
linsepath = '/sq_sw/linse/frappy'
if linsepath not in sys.path:
sys.path.insert(0, linsepath)
import re
from pathlib import Path
from nicos import config, session
from nicos.core import Override, Param, Moveable, status, POLLER, SIMULATION, DeviceAlias, \
Device, Readable, anytype, listof, MASTER, Attach
from frappy_sinq.secop.devices import SecNodeDevice, SecopDevice, DefunctDevice, SecopWritable, NicosSecopClient
from nicos.core.utils import createThread
from nicos.utils.comparestrings import compare
from nicos.devices.secop.devices import get_attaching_devices
from nicos.utils import loggers
from nicos.commands.basic import RemoveSetup, AddSetup
from linsetools.frappy import FrappyControl
SETUP_TEMPLATE = """description = '%(desc)s'
group = 'optional'
display_order = 50.5
devices = {
'secnode_%(nodename)s':
device('frappy_sinq.new.FrappyNode',
description='SECoP connection to %(nodename)s', unit='', async_only=True,
prefix='%(prefix)s', auto_create=True,
uri=%(uri)s, visibility_level=2,
general_stop_whitelist=['om', 'stickrot'],
%(nodeargs)s),
}
%(additions)s
startupcode = '''
try:
frappy.adjustEnvironment()
except NameError:
printinfo("please load also the 'frappy' setup")
'''
"""
MEANINGS = {
'temperature': 'Ts',
'temperature_regulation': 'T',
'magneticfield': 'B',
'pressure': 'p',
'rotation_z': 'a3',
'stick_rotation': 'dom',
}
SKIP_ENV = 'rotation_z', 'dom'
SETUPDIR = '/home/linse/setups'
IDSUB = re.compile('\W+|^(?=\d)')
NOT_USED = 'not_used'
NAME_URI = re.compile(r'([^=]+)=(.+)')
def shorten(secnode_list):
result = ','.join(('' if v == '0' else v.split('=')[0]) for v in secnode_list)
return result[:-1] if result.endswith(',') else result
class Service(str):
pass
class Main(Service):
name = 'main'
class Stick(Service):
name = 'stick'
class Addons(Service):
name = 'addons'
def uri_with_kind(kind, uri):
if kind >= 2:
return Addons(uri)
return Stick(uri) if kind else Main(uri)
def secnodes_from_string(value):
result = {}
for kind, item in enumerate(value.split()):
match = NAME_URI.match(item)
if match:
name, uri = match.groups()
result[name] = uri_with_kind(kind, uri)
return result
def secnodes_to_string(secnodes):
secnode_list = ['0', '0']
for name, uri in secnodes.items():
item = f'{name}={uri}'
if isinstance(uri, Main):
secnode_list[0] = item
elif isinstance(uri, Stick):
secnode_list[1] = item
else:
secnode_list.append(item)
return ' '.join(secnode_list)
def shorten(value):
result = ','.join('' if v == '0' else v.split('=')[0]for v in value.split())
return result[:-1] if result.endswith(',') else result
def get_meanings(modname, moddesc):
meaning = moddesc['properties'].get('meaning')
result = {}
if meaning:
meaning_name, importance = meaning
if meaning_name not in MEANINGS:
session.log.warning('%s: meaning %r is unknown', modname, meaning_name)
return result
result.setdefault(meaning_name, []).append((importance, modname))
if meaning_name == 'temperature_regulation':
# add temperature_regulation to temperature list, with very low importance
result.setdefault('temperature', []).append((importance - 100, modname))
elif meaning_name == 'temperature' and moddesc['parameters'].get('target'):
result.setdefault('temperature_regulation', []).append((importance, modname))
return result
class FrappyManager(Readable):
"""start/stops to frappy servers or connects to SEC nodes"""
# TODO: make prefix a parameter
parameters = {
'prefix': Param("Prefix for the generated devices",
type=str, default='se_', settable=True),
'secnodes': Param('name=uri of secnodes, space separated',
type=str, default='', settable=True),
}
parameter_overrides = {
'unit': Override(unit='', mandatory=False),
}
valuetype = str # the main value is a comma separated list
_running = None
_frappy_control = None
_box2uri = None
_uri2box = None
_secnodes = None # a dict like Secnodes instance
def doInit(self, mode):
if SETUPDIR not in session._setup_paths:
session.log.error('can not use frappy as %r is not in the setup_subdirs', SETUPDIR)
expt = session.experiment
if 'persistent_environment' in expt.parameters:
expt.persistent_environment = [v for k, v in MEANINGS.items() if k not in SKIP_ENV]
else:
cls = type(expt)
session.log.warning(
'experiment (%s.%s) does not support persistent environment',
cls.__module__, cls.__qualname__)
self._check_secnodes(True)
def doUpdateSecnodes(self, value):
self._secnodes = secnodes_from_string(value)
self._value = shorten(value)
def doRead(self, maxage=0):
return self._value
def _check_secnodes(self, check_connection):
fc = self._frappy_control = FrappyControl('this')
self._uri2box = {}
self._box2uri = {}
for nodename, uri in fc.config.sections.get('boxes', {}).items():
self._box2uri[nodename] = uri
self._uri2box[uri] = nodename
fc.get_cfg_info()
self._running = fc.running()
if self._secnodes is None:
secnodes = secnodes_from_string(self.secnodes)
else:
secnodes = {k: v for k, v in self._secnodes.items() if k in self._running or not v.startswith('localhost:')}
for name, uri in self._secnodes:
if name not in self._running and uri.startswith('localhost.'):
continue
secnodes[name] = uri
if check_connection:
for name, uri in list(secnodes.items()):
if not uri.startswith('localhost:'):
secopclient = self.connect_secnode(uri)
if not secopclient:
secnodes.pop(name)
continue
secopclient.disconnect()
self._box2uri[name] = uri
self._uri2box[uri] = name
self._secnodes = secnodes
self.secnodes = secnodes_to_string(secnodes)
self._value = shorten(self.secnodes)
def doPoll(self, maxage=0):
self.doRead()
def doStatus(self, maxage=0):
return status.OK, ''
def get_uri_name(self, arg, service=None):
"""return (<uri>, <name>) from arg"""
if ':' in arg:
return arg, self._uri2box.get(arg) or arg.replace(':', '_')
uri = self._box2uri.get(arg)
if uri is None:
if service:
uri = service(f'localhost:{self._frappy_control.get_port(service.name)}')
else:
uri = self._frappy_control.cfg_info.get(arg)
if service:
return service(uri), arg
return uri, arg
def connect_secnode(self, uri):
"""connect to secnode
do not forget to disconnect if no longer used
"""
secopclient = NicosSecopClient(uri)
try:
secopclient.connect(30)
return secopclient
except Exception:
return None
def __call__(self, main=None, stick=None, addons=None, *extra, force=False, update=True):
self._check_secnodes(False)
fc = self._frappy_control
# determine which servers to stop/start
tostop = []
tostart = {}
main_item = None
stick_item = None
for name, uri in self._secnodes:
if isinstance(uri, Main):
main_item = name
elif isinstance(uri, Stick):
stick_item = name
if main is not None:
if main_item:
tostop.append(main_item)
self._secnodes.pop(main_item)
if main:
tostart[main] = Main
# auto stick:
if main_item != main and stick is None:
try:
cfg = f'{main}stick'
service, cfgfile = fc.cfg_file(None, 'stick', cfg)
if service == 'stick':
stick = cfg
except FileNotFoundError:
pass
if stick is not None:
if stick_item:
tostop.append(stick_item)
self._secnodes.pop(stick_item)
if stick:
tostart[stick] = Stick
for arglist in (addons,) + extra:
if arglist == '':
tostop.extend(self._secnodes)
elif arglist is not None:
# allow legacy '<cfg1>,<cfg2>'
for arg in arglist.split(','):
if arg:
tostart[arg] = None
toremove = []
for cfg in self._running:
if cfg is None:
continue
# TODO: when using nodename for setup, this may not work here ...
setup = self.get_setup_name(cfg)
if setup in session.loaded_setups:
toremove.append(setup)
if ':' in cfg:
continue
if toremove:
RemoveSetup(*toremove)
for cfg in tostop:
fc.stop(cfg)
if cfg not in tostart:
fc.delete_frappy_service(cfg)
toadd = []
# add and start new servers
for arg, service in tostart.items():
uri, name = self.get_uri_name(arg, service)
cfginfo = ''
if isinstance(uri, Service):
port = fc.get_port(uri.name)
uri = fc.add_frappy_service(uri.name, name, port, session.log)
cfginfo = f' ({name})'
fc.start(name)
session.log.info('wait for startup of %r%s', uri, cfginfo)
secopclient = self.connect_secnode(uri)
if secopclient:
self.write_setup_file(secopclient, uri, name, arg)
secopclient.disconnect()
toadd.append(self.get_setup_name(name))
else:
self._secnodes.pop(name, None)
session.log.exception('cannot connect to %r%s', uri, cfginfo)
session.readSetups()
if toadd:
AddSetup(*toadd)
self._running = self._frappy_control.running()
return self.read()
def get_setup_name(self, cfg):
cfg = cfg.replace(':', '_')
return f'se_{cfg}'
def _get_device_name(self, device_mapping, module):
"""get a modules mapped name according to device_mapping or None,
if unmapped
"""
if module in device_mapping and 'name' in device_mapping[module]:
return self.device_mapping[module]['name']
return self.prefix + module
def node_setup_info(self, secopclient):
"""determine aliases and envlist for SECoP devices
depending on their meaning
"""
modules = secopclient.modules
result = {} # dict <meaning name> of list of (<importance>, <target>)
device_mapping = {}
reserved_names = {v.lower() for v in MEANINGS.values()}
for modname, moddesc in modules.items():
if modname.lower() in reserved_names:
device_mapping[modname] = {'name': f'{modname}_'}
# meanings = self.get_meanings(modname, moddesc)
meaning = moddesc['properties'].get('meaning')
if meaning:
meaning_name, importance = meaning
if meaning_name not in MEANINGS:
self.log.warning('%s: meaning %r is unknown', modname, meaning_name)
continue
result.setdefault(meaning_name, []).append((importance, modname))
if meaning_name == 'temperature_regulation':
# add temperature_regulation to temperature list, with very low importance
result.setdefault('temperature', []).append((importance - 100, modname))
elif meaning_name == 'temperature' and moddesc['parameters'].get('target'):
result.setdefault('temperature_regulation', []).append((importance, modname))
envlist = []
alias_config = {}
for meaning_name, info in result.items():
importance, modname = sorted(info)[-1]
devname = self._get_device_name(device_mapping, modname)
target = MEANINGS.get(meaning_name)
alias_config[target] = {devname: importance, NOT_USED: -100}
if target == 'a3' and meaning_name == 'rotation_z':
alias_config['om'] = {devname: importance}
if target not in SKIP_ENV:
envlist.append(target)
return envlist, alias_config, device_mapping
def write_setup_file(self, secopclient, uri, name, origname):
setup = self.get_setup_name(name)
setup_file = Path(SETUPDIR) / f'{setup}.py'
if name != origname:
name = IDSUB.sub(secopclient.nodename.replace('.psi.ch', ''), '_')
self._box2uri[name] = uri
envlist, alias_config, devmap = self.node_setup_info(secopclient)
nodeargs = f'device_mapping={devmap!r}' if devmap else ''
additions = []
if envlist:
aliassetups = [f'{k}_alias' for k in envlist]
# additions.append(f'includes = {aliassetups!r}')
if alias_config:
additions.append(f'alias_config = {alias_config!r}')
desc = secopclient.properties.get('description') or name
setup_content = SETUP_TEMPLATE % {
'nodename': name,
'prefix': self.prefix,
'desc': desc.split('\n')[0],
'uri': repr(uri),
'nodeargs': nodeargs,
'additions': '\n'.join(additions),}
setup_file.write_text(setup_content)
prev_mode = setup_file.stat().st_mode
if not prev_mode & 0x20:
setup_file.chmod(prev_mode | 0x20)
return envlist
def adjustEnvironment(self):
if self._mode == SIMULATION:
return
samenvlist = []
for meaning, devname in MEANINGS.items():
dev = session.devices.get(devname)
if meaning in SKIP_ENV:
continue
if dev:
if dev.alias != NOT_USED:
samenvlist.append(devname)
elif devname in session.alias_config:
self.log.warning('can not find alias %r', devname)
expt = session.experiment
try:
expt.persistent_environment = samenvlist
except Exception:
pass
envlist = expt.sampleenv
previous = set(envlist)
for devname in samenvlist:
dev = session.devices.get(devname)
if dev is None:
if devname in previous:
envlist.remove(devname)
elif devname not in previous:
envlist.append(devname)
expt.setEnvironment(envlist)
def cleanup_defunct():
for devname, setupname in list(session.dynamic_devices.items()):
dev = session.devices.get(devname)
if dev and dev._defunct:
devnames = [d.name for d, _ in get_attaching_devices(dev)]
if devnames:
session.log.warning('can not remove device %r due to dependencies on %s'
% (devname, ', '.join(devnames)))
else:
session.destroyDevice(devname)
session.dynamic_devices.pop(devname, None)
class FrappyNode(SecNodeDevice):
"""SEC node device, works together with superfrappy"""
parameters = {
'param_category': Param("category of parameters\n\n"
"set to 'general' if all parameters should appear in the datafile header",
type=str, default='', settable=True),
'quiet_init': Param('flag to set loglevel to error while initializing',
type=bool, default=False, settable=True),
# duplicate from SecNodeDevice. needed for the case where the code for
# the SecNodeDevcice is not up to date, but the setup is already new
# does not yet have the i
'general_stop_whitelist': Param('module names to accept general stop',
type=listof(str), prefercache=False,
default=[], userparam=False),
}
_lastcfg = None
# _marche = None
def doInit(self, mode):
# self._marche = MarcheControl()
if mode != SIMULATION and session.sessiontype != POLLER:
pass
# TODO:
# host_port = self.uri.rsplit('://')[-1]
# status = self._marche.status(config.instrument)
# running = self._attached_superfrappy.check_running(host_port)
# if self.frappycfg and running != self.frappycfg:
# self.superfrappy.add_server(host_port)
super().doInit(mode)
def createDevices(self):
super().createDevices()
self.log.info('--- create devices ---')
secnode = self._attached_secnode
for devname, (_, devcfg) in self.setup_info.items():
meanings = get_meanings(devcfg['secop_module'], devcfg['secop_properties'])
for meaning in meanings:
alias = MEANINGS.get(meaning)
if alias and alias not in session.devices:
self.log.info('create alias %r for %s', alias, meaning)
# alias is not yet created
devcfg = ('nicos.core.device.DeviceAlias', {'descripton': meaning})
session.configured_devices[alias] = devcfg
session.dynamic_devices[alias] = 'frappy'
session.createDevice(alias, recreate=True, explicit=True)
elif not alias:
self.log.info('do not know meaning %s', meaning)
if self.param_category:
params_cfg = devcfg['params_cfg']
dev = session.devices[devname]
for pname, pargs in params_cfg.items():
pinfo = dev.parameters[pname]
if not pinfo.category:
pinfo.category = self.param_category
def makeDynamicDevices(self, setup_info):
patched_loggers = {}
if self.quiet_init:
for devname, (_, devcfg) in setup_info.items():
log = session.getLogger(devname)
if log not in patched_loggers:
result = [loggers.INFO] # default level
patched_loggers[log] = result
log.setLevel(loggers.ERROR)
# avoid level change when the loglevel parameter is treated
# store level instead in result
log.__dict__['setLevel'] = result.append
try:
super().makeDynamicDevices(setup_info)
finally:
for log, result in patched_loggers.items():
log.__dict__.pop('setLevel', None) # re-enable setLevel
log.setLevel(result[-1]) # set to stored or default value
def disable(self):
seaconn = session.devices.get('sea_%s' % self.service)
if seaconn and seaconn._attached_secnode:
seaconn.communicate('frappy_remove %s' % self.service)
self._set_status(*self._status)
def _set_status(self, code, text):
if self.uri == '':
code, text = status.DISABLED, 'disabled'
SecNodeDevice._set_status(self, code, text)
# def restart(self):
# """restart frappy server"""
# host_port = self.uri.rsplit('://')[-1]
# self._marche.restart(config.instrument, self.frappycfg)
def get_info(self):
result = self.doRead() or ''
code, text = self.status()
if not result:
return '<disconnected>'
if code == status.OK or result == '':
return result
if (code, text) == (status.ERROR, 'reconnecting'):
return '%s (frappy not running)' % result
return '%s (%s)' % (result, text)
class NullDevice(Moveable):
"""dummy target for sample environment aliases"""
parameter_overrides = {
'unit': Override(unit='', mandatory=False),
}
def doRead(self, maxage=0):
return 0
def doStatus(self, maxage=0):
return status.DISABLED, 'disabled'
def doStart(self, target):
self.log.warning('disabled, cannot not move')