Files

1641 lines
64 KiB
Python

# *****************************************************************************
# NICOS, the Networked Instrument Control System of the MLZ
# Copyright (c) 2009-2025 by the NICOS contributors (see AUTHORS)
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
"""Support for SECoP
This module contains a SecNode device, which represent connections to a SECoP
server, and the base classes SecopDevice, SecopReadable and SecopMoveable.
A SecNodeDevice is connected to a SecNode by assigning the uri value, this
can be done at any time. With auto_create == True, all devices are created
automatically after connections, and they are altered on reconnection.
For a setup for permanent use, with auto_create == False (the default), the
devices are to be created with the class SecopDevice, SecopReadable or
SecopMoveable, and the following parameters:
- secnode: the secnode device name
- secop_module: the name of the remotely connected SECoP module
- params_cfg (optional): a dict {<parameter name>: <cfg dict>}
if given, only the parameters mentioned as keys are exported.
The cfg dict may be empty for using the automatically generated attributes:
description, datainfo, settable, unit and fmtstr.
The data type must be given as SECoP datainfo converted from json,
not as NICOS type!
Attributes given in the cfg dict may overwrite above attributes.
"""
import re
import time
from collections import defaultdict
from math import floor, log10
from threading import Event, RLock
from frappy.client import SecopClient
from frappy.datatypes import StatusType
from frappy.errors import CommunicationFailedError, ReadFailedError
from nicos import session
from nicos.core import POLLER, SIMULATION, Attach, DeviceAlias, HasLimits, \
HasOffset, NicosError, Override, Param, status, usermethod
from nicos.core.device import Device, DeviceMeta, DeviceMetaInfo, \
DeviceParInfo, Moveable, Readable, Measurable
from nicos.core.errors import CommunicationError, ConfigurationError, UsageError
from nicos.core.params import anytype, dictof, floatrange, intrange, listof
from nicos.core.utils import formatStatus
from nicos.devices.secop.validators import get_validator
from nicos.protocols.cache import cache_dump
from nicos.utils import createThread, importString, merge_dicts, printTable
try:
# TODO: remove try/except when frappy is updated on all sites
from frappy.datatypes import visibility_validator
except ImportError:
LEGACY_VISIBILITY = {'user': 'www', 1: 'www', 'advanced': 'ww-', 2: 'ww-', 'expert': 'w--', 3: 'w--'}
def visibility_validator(value):
value = LEGACY_VISIBILITY.get(value, value)
if len(value) == 3 and set(value) <= set('wr-'):
return value
raise ValueError(f'{value!r} is not a valid visibility')
class NicosSecopClient(SecopClient):
SPECIAL_NAMES = {'target', 'status', 'stop', 'pollinterval', 'reset'}
def internalize_name(self, name):
"""name mangling
- names matching the names of NICOS Moveable attributes
are mangled, as they would conflict.
- however, names with a special treatment in SecopDevice.makeDevClass
are *not* mangled
"""
if name in self.SPECIAL_NAMES:
return name
name = super().internalize_name(name).lower()
prefix, sep, postfix = name.partition('_')
if prefix not in dir(SecopMoveable):
return name
# mangle names matching NICOS device attributes
# info -> info_, info_ -> info_1, info_1 -> info_2, ...
if not sep:
return name + '_'
if not postfix:
return name + '_1'
try:
num = int(postfix)
if str(num) == postfix:
return '%s_%d' % (prefix, num + 1)
except ValueError:
pass
return name
class DefunctDevice(Exception):
pass
def clean_identifier(anystring):
return str(re.sub(r'\W+|^(?=\d)', '_', anystring))
def type_name(typ):
try:
return typ.__name__
except AttributeError:
return typ.__class__.__name__
def get_aliases(dev):
"""get devices aliased to on dev
return a list containing for each alias a tuple (device, required class).
"""
# check that the new class fits the required classes on devices self
# is attached to
result = []
for alias in session.devices.values():
if isinstance(alias, DeviceAlias) and alias.alias == dev.name:
result.append((alias, alias._cls))
return result
def get_attaching_devices(dev):
"""get devices attached to dev
return a list of tuples (device, required class).
"""
result = []
for devname in dev._sdevs:
sdev = session.devices.get(devname)
if sdev:
for aname, att in sdev.attached_devices.items():
if getattr(sdev, '_attached_' + aname, None) == dev:
result.append((sdev, att.devclass))
return result
def class_from_interface(module_properties):
for ifclass in (module_properties.get('interface_classes', []) or
module_properties.get('interface_class', [])):
try:
return IF_CLASSES.get(ifclass) or IF_CLASSES[ifclass.title()]
except KeyError:
continue
return SecopDevice
def secop_base_class(cls):
"""find the first base class from IF_CLASSES in cls"""
for b in cls.__mro__:
if b in ALL_IF_CLASSES:
return b
raise TypeError('secop_base_class argument must inherit from SecopDevice')
def get_exc_name(exc):
# get name of SECoPError, NicosError or other error
return getattr(exc, 'name', 0) or getattr(exc, 'category', 0) or type(exc).__name__
def make_nicos_error(exc, category=None):
"""convert SECoP error to nicos error
nicer logging compared to NicosError(str(exc)):
"<secop-error-class> - <str(exc)>" instead of "Error - <str(exc>)>"
"""
if isinstance(exc, NicosError):
return exc
if category is None:
category = get_exc_name(exc)
# do not repeat category when contained already at start
text = re.sub(f'^{category} ?[-:]? ', '', str(exc))
# NicosLogger needs category to be a class attribute
# still inherit from SECoP error class
return type('NicosSecopError', (type(exc), NicosError), {'category': category})(text)
_last_setup_info = {}
class SecNodeDevice(Readable):
"""SEC node device.
want to have a status -> based on Readable
"""
parameters = {
'prefix': Param("Prefix for the generated devices\n\n"
"'$' will be replaced by the equipment id."
" It should end with a trailing underscore",
type=str, default='$_', settable=True),
'uri': Param('tcp://<host>:<port>', type=str, settable=True),
'auto_create': Param('Flag for automatic creation of devices',
type=bool, prefercache=False, userparam=False),
'setup_info': Param('Setup info', type=anytype, default={},
settable=True),
'visibility_level': Param('level for devlist visibility of created devices',
type=intrange(1, 3), prefercache=False,
default=1, userparam=False),
'async_only': Param('True: inhibit SECoP reads on created devices, '
'use events only', type=bool, prefercache=False,
default=False, userparam=False),
'allow_list': Param('list of device names to allow creating',
ext_desc='The original names found on the'
' SecNode are looked at. If the allow_list is'
' empty, all modules are allowed',
type=listof(str), prefercache=False, default=[],
userparam=False),
'device_mapping': Param('Dict of mappings for Name and Mixins ',
type=dictof(str, dictof(str, anytype)),
prefercache=False, default={}, userparam=False),
'general_stop_whitelist': Param('module names to accept general stop',
type=listof(str), prefercache=False,
default=[], userparam=False),
'general_stop_blacklist': Param('module names to ignore general stop',
type=listof(str), prefercache=False,
default=[], userparam=False)
}
parameters['device_mapping'].ext_desc = """
Dictionary name -> mapping where mapping is a dictionary which can contain
the following keys:
name -> remapped name of the device
mixins -> list of names of mixin classes that should be added to the class
parameters -> list of parameters for configuration of the mixins
For example, from a secnode with the modules examplemodule and barcodes,
among others:
.. code:
secnode = device(
'nicos.devices.secop.SecNodeDevice',
description='doc demo',
uri = 'tcp://localhost:10767',
auto_create = True,
unit='',
device_mapping = {
'examplemodule': {
'name': 'ExampleName',
},
'barcodes': {
'mixins':
['nicos_mlz.devices.barcodes.BarcodeInterpreterMixin'],
},
'parameters': {
'commandmap': {},
},
},
allow_list = ['barcodes', 'examplemodule'],
),
Barcodes will then be mapped to a NICOS-device called nodname_barcodes and
examplemodule will become a device named ExampleName.
"""
parameters['general_stop_whitelist'].ext_desc = """
If this value is not empty, it is a list of SECoP module names accepting
general stop, modules not listed will ignore general stop. The parameter
'general_stop_blacklist' must be empty in this case.
"""
parameters['general_stop_blacklist'].ext_desc = """
This value contains a list of SECoP module names ignoring
general stop, Modules not listed will accept general stop.
"""
parameter_overrides = {
'unit': Override(default='', mandatory=False),
# maxage is not used on the SecNodeDevice itself, but is the base
# for calculating a slightly hihger value on the SecopReadable instances
'maxage': Override(default=600, userparam=False),
# not used:
'pollinterval': Override(default=None, userparam=False, settable=False),
}
valuetype = str
_secnode = None
_value = ''
_status = status.OK, 'unconnected' # the nicos status
_devices = {}
_custom_callbacks = defaultdict(list)
_polled_devs = ()
__poll_thread = None
_acq_controllers = {}
_acq_channels = {}
def doPreinit(self, mode):
self._devices = {}
def doInit(self, mode):
if mode == SIMULATION:
setup_info = self.get_setup_info()
if self.auto_create:
self.makeDynamicDevices(setup_info)
else:
self._setROParam('setup_info', setup_info)
elif session.sessiontype != POLLER:
if self.uri:
try:
self._connect()
except Exception:
self.log.exception('during initial connect')
if self.maxage is None:
self.maxage = 600
self.__shutdown = Event()
self._polled_devs = [dev for dev in self._devices.values()
if isinstance(dev, SecopReadable)]
self.__poll_thread = createThread('poll-thread', self.__poll)
def get_setup_info(self):
if self._mode == SIMULATION:
db = session._getSyncDb()
# we might do here in addition:
session.simulationSync(db)
return db.get('%s/setup_info' % self.name.lower())
return self.setup_info
def __poll(self):
"""simple poll thread
needed only for making sure value and status do not expire.
we can not use the regular poller for this, as dynamic devices
are not registered there
"""
if not self._polled_devs: # avoid busy loop
return
while True:
if not self._cache: # defunct sec node
return
for dev in self._polled_devs:
if self.__shutdown.wait(1):
return
# status()/read() do only call doStatus()/doRead() when the
# value is older than self.maxage. the maximum age will then be
# a little higher, as calculated in SecopReadable.doReadMaxage
try:
dev.status(self.maxage)
except Exception as e:
dev.log.debug('error calling status(): %r', e)
try:
dev.read(self.maxage)
except Exception as e:
dev.log.debug('error calling read(): %r', e)
def doRead(self, maxage=0):
if self._secnode:
if self._secnode.online:
self._value = self._secnode.nodename
else:
self._value = ''
return self._value
def doStatus(self, maxage=0):
return self._status
def _set_status(self, code, text):
self._status = code, text
if self._cache:
self._cache.put(self, 'status', self._status)
self._cache.put(self, 'value', self.doRead())
def doWriteUri(self, value):
"""change uri and reconnect"""
# make sure uri is set before reconnect
self._setROParam('uri', value)
if self.uri:
if not self._sim_intercept:
self._connect()
else:
self._disconnect()
return value
def _connect(self):
"""try to connect
called on creation and on uri change,
but NOT on automatic reconnection
"""
if not self.uri:
self._disconnect()
if self._secnode:
self._secnode.disconnect()
self._secnode = NicosSecopClient(self.uri, log=self.log)
self._secnode.register_callback(None, self.nodeStateChange,
self.descriptiveDataChange)
try:
self._secnode.connect(10)
self._set_status(status.OK, 'connected')
self.createDevices()
return
except Exception as e:
if not isinstance(e, CommunicationFailedError):
raise
self.log.warning('can not connect to %s (%s), retry in background',
self.uri, e)
self._set_status(status.ERROR, 'try connecting')
start_event = Event()
self._secnode.spawn_connect(start_event.set)
def _disconnect(self):
if not self._secnode:
return
self.removeDevices()
self._secnode.disconnect()
self._set_status(status.OK, 'unconnected')
self._secnode = None
def descriptiveDataChange(self, module, description):
"""called when descriptive data changed
after an automatic reconnection"""
self.log.warning('node description changed')
self.createDevices()
def get_status(self, online, state):
if not online:
return status.ERROR, state
return status.OK if state == 'connected' else status.WARN, state
def nodeStateChange(self, online, state):
"""called when the state of the connection changes
'online' is True when connected or reconnecting, False when
disconnected or connecting
'state' is the connection state as a string
"""
if online and state == 'connected':
self._set_status(status.OK, 'connected')
for device in self._devices.values():
device.setConnected(True)
elif not online:
self._set_status(status.ERROR, 'reconnecting')
for device in self._devices.values():
device.setConnected(False)
else:
self._set_status(status.WARN, state)
def doShutdown(self):
if self.__poll_thread:
self.__shutdown.set()
self.__poll_thread.join()
self._disconnect()
if self._devices:
self.log.error('can not remove devices %s', list(self._devices))
def _get_prefix(self):
if self._secnode is None:
# at least when prefix does not contain '$', this works in dry run
return self.prefix
equipment_name = clean_identifier(self._secnode.nodename).lower()
return self.prefix.replace('$', equipment_name)
def _get_device_name(self, module):
"""get a modules mapped name according to device_mapping or None,
if unmapped
"""
if module in self.device_mapping \
and 'name' in self.device_mapping[module]:
return self.device_mapping[module]['name']
return self._get_prefix() + module
@usermethod
def showModules(self):
"""show modules of the connected SECoP server
and intended devices names using the given prefix
"""
if self._sim_intercept:
return
if self._secnode is None:
self.log.error('secnode is not connected')
return
items = [
(self._get_prefix() + m, m,
mod_desc.get(
'properties', {}).get('description', '').split('\n')[0])
for m, mod_desc in self._secnode.modules.items()]
printTable(['foreseen device name', 'SECoP module', 'description'],
items, self.log.info)
def registerDevice(self, device):
if not self._secnode:
raise OSError('unconnected')
self.log.debug('register %s on %s', device, self)
self._devices[device.name] = device
module = device.secop_module
if module not in self._secnode.modules:
raise ConfigurationError('no module %r found on this SEC node'
% module)
for parameter in self._secnode.modules[module]['parameters']:
updatefunc = getattr(device, '_update_' + parameter,
device._update)
self._secnode.register_callback((module, parameter),
updateItem=updatefunc)
try:
data = self._secnode.cache[module, parameter]
if data:
updatefunc(module, parameter, data)
else:
self.log.warning('No data for %s:%s', module, parameter)
except KeyError:
self.log.warning('No cache for %s:%s', module, parameter)
def unregisterDevice(self, device):
self.log.debug('unregister %s from %s', device, self)
if self._devices.pop(device.name, None) is None:
self.log.info('device %s already removed', device.name)
return
module = device.secop_module
try:
moddesc = self._secnode.modules[module]
except KeyError: # do not complain again about missing module
return
for parameter in moddesc['parameters']:
updatefunc = getattr(device, '_update_' + parameter,
device._update)
self._secnode.unregister_callback((module, parameter),
updateItem=updatefunc)
for custom_callback in self._custom_callbacks.pop((module, parameter), []):
self._secnode.unregister_callback((module, parameter),
updateItem=custom_callback)
def createDevices(self):
"""create drivers and devices
for the devices to be created from the connected SECoP server
"""
if not self._secnode:
self.log.error('secnode is not connected')
return
setup_info = {}
# keep track of configured devices to warn when one is configured
# without being found on the SECNode or filtered by allow_list
configured = set(self.device_mapping.keys())
if self.general_stop_whitelist:
if self.general_stop_blacklist:
self.log.error('only one of general_stop_blacklist/'
'general_stop_whitelist may be given')
self.log.error('-> general_stop_blacklist is ignored')
general_stop_blacklist = (set(self._secnode.modules) -
set(self.general_stop_whitelist))
else:
general_stop_blacklist = set(self.general_stop_blacklist)
for module, mod_desc in self._secnode.modules.items():
# If the module should not be created, skip it
if self.allow_list and module not in self.allow_list:
self.log.debug('skipping module %s, as it is not in the list'
' of allowed modules', module)
continue
if module in configured:
configured.remove(module)
module_properties = mod_desc.get('properties', {})
kwds = {
'secop_properties': module_properties,
}
# Map device name according to config
dev_name = self._get_device_name(module)
self.log.debug('using device name %s for SECoP-module %s',
dev_name, module)
# convert parameters and command description to the needed items,
# especially avoid datatype or a nicos type here,
# as this would need pickle to put into the cache.
# datainfo is no problem
params_cfg = {}
for pname, props in mod_desc['parameters'].items():
datainfo = props['datainfo']
# take only the first line in description, else it would
# messup ListParams
# TODO: check whether is not better to do this in ListParams
pargs = dict(datainfo=datainfo, description=(
props['description'].splitlines() or [''])[0])
if not props.get('readonly', True) and pname != 'target':
pargs['settable'] = True
unit = datainfo.get('unit', '')
fmtstr = None
if datainfo['type'] == 'double':
fmtstr = datainfo.get('fmtstr', '%g')
elif datainfo['type'] == 'scaled':
fmtstr = datainfo.get(
'fmtstr', '%%.%df' %
max(0, -floor(log10(datainfo['scale']))))
if unit:
pargs['unit'] = unit
if pname == 'target':
kwds['target_datainfo'] = datainfo
pargs['datainfo'] = None
elif pname == 'value':
kwds['unit'] = unit
if fmtstr is not None:
kwds['fmtstr'] = fmtstr
else:
kwds['fmtstr'] = '%r'
kwds['value_datainfo'] = datainfo
if pname not in ('status', 'value'):
if fmtstr is not None and fmtstr != '%g':
pargs['fmtstr'] = fmtstr
params_cfg[pname] = pargs
commands_cfg = {
cname: {'description': props.get('description', ''),
'datainfo': props['datainfo']}
for cname, props in mod_desc['commands'].items()}
mixins = self.device_mapping.get(module, {}).get('mixins', [])
kwds.update(self.device_mapping.get(
module, {}).get('parameters', []))
cls = class_from_interface(module_properties)
if isinstance(cls, SecopReadable):
kwds.setdefault('unit', '') # unit is mandatory on Readables
# convert legacy to new SECoP visibility
secop_visibility = visibility_validator(module_properties.get('visibility', 'www'))
visibility = {'namespace', 'metadata'}
# TODO: do we need to restrict namespace visibility?
# it seems this is not respected, as devices are created outside CreateAllDevices()
# TODO: we might have a metadata_level for changing metadata visibility
# TODO: handle readonly visibility properly
if secop_visibility[3 - self.visibility_level] == 'w':
# example: for "ww-" above is True for visibility level 2 and 3
visibility.add('devlist')
kwds['visibility'] = frozenset(visibility)
if 'ignore_general_stop' in cls.parameters:
kwds['ignore_general_stop'] = module in general_stop_blacklist
mdescription = mod_desc.get('properties', {}).get('description', '')
# a nicos description should only have one line, strip the rest
mdescription = mdescription.split('\n', 1)[0]
desc = dict(secnode=self.name,
description=mdescription,
secop_module=module,
params_cfg=params_cfg,
commands_cfg=commands_cfg,
mixins=mixins,
**kwds)
# the following test may be removed later, when no bugs appear ...
if 'cache_unpickle("' in cache_dump(desc):
self.log.error('module %r skipped - setup info needs pickle',
module)
self.log.warning('%r', cache_dump(desc))
else:
setup_info[dev_name] = (
'frappy_sinq.secop.devices.%s' % cls.__name__, desc)
if not setup_info:
self.log.info('creating devices for %s skipped', self.name)
return
if self.auto_create:
if configured:
self.log.warning('configured modules were not found or were'
' skipped during creation: %s!',
list(configured))
self.makeDynamicDevices(setup_info)
else:
self._setROParam('setup_info', setup_info)
def removeDevices(self):
self.makeDynamicDevices({})
def makeDynamicDevices(self, setup_info):
"""create and destroy dynamic devices
create devices from setup_info, and store the name of the setup
creating the creator device in session.creator_devices for
session.getSetupInfo()
Based on the previous setup_info from self.setup_info,
devices are created, recreated, destroyed or remain unchanged.
If setup_info is empty, destroy all devices.
"""
prevdevices = set(self.setup_info.keys())
self._setROParam('setup_info', setup_info)
# find setup of this secnode
result = session.getSetupInfo()
setupname = None # only to avoid undefined-loop-variable later
for setupname in session.loaded_setups:
info = result.get(setupname, None)
if info and self.name in info['devices']:
break
else:
for setupname, info in session._setup_info.items():
if self.name in info:
break
else:
raise ConfigurationError(f'can not find setup {list(session._setup_info)}')
# add new or changed devices
for devname, devcfg in setup_info.items():
prevdevices.discard(devname)
dev = session.devices.get(devname, None)
if dev:
if not isinstance(dev, SecopDevice) or (
dev._attached_secnode and
dev._attached_secnode != self):
self.log.error('device %s already exists', devname)
continue
base = secop_base_class(dev.__class__)
prevcfg = (base.__module__ + '.' + base.__name__,
dict(secnode=self.name, **dev._config))
else:
prevcfg = None
session.configured_devices[devname] = devcfg
session.dynamic_devices[devname] = setupname
if prevcfg == devcfg:
if dev._defunct:
dev.setAlive(self)
else:
if dev is None:
# add new device
session.createDevice(devname, recreate=True, explicit=True)
dev = session.devices[devname]
else:
# modify existing device
if dev._attached_secnode:
dev._attached_secnode.unregisterDevice(dev)
try:
dev.replaceClass(devcfg[1])
dev.setAlive(self)
except ConfigurationError:
# above failed because an alias or attaching device
# requires a specific class.
# make old device defunct and replace by a new device
session.destroyDevice(dev)
session.dynamic_devices.pop(devname, None)
session.createDevice(devname, recreate=True,
explicit=True)
prevdevices.discard(devname)
dev = session.devices[devname]
if not isinstance(dev, SecopReadable):
# we will not get status updates for these
dev.updateStatus()
defunct = set()
# defunct devices no longer available
for devname in prevdevices:
dev = session.devices.get(devname)
if dev is None or dev._attached_secnode != self:
continue
# do not consider temporary devices
sdevs = [att for att in dev._sdevs if att in session.devices]
if sdevs:
self.log.warning('defunct device is attached to %s',
', '.join(sdevs))
dev.setDefunct()
defunct.add(devname)
# inform client that setups have changed
session.setupCallback(list(session.loaded_setups),
list(session.explicit_setups))
def register_custom_callback(self, module, parameter, f):
"""Register custom callbacks for parameter updates.
Use the method found on the SecopDevice class instead of this directly.
"""
# TODO: or NameError?
if module not in self._secnode.modules:
raise ValueError('no module %r found on this SEC node' % module)
if parameter not in self._secnode.modules[module]['parameters']:
raise ValueError('no parameter %r found on module %r of this SEC node'
% (parameter, module))
self._custom_callbacks[(module, parameter)].append(f)
self._secnode.register_callback((module, parameter), updateItem=f)
self.log.debug('registered callback %r for %s:%s', f.__name__,
module, parameter)
def unregister_custom_callback(self, module, parameter, f):
"""Unregister a custom callback on this Node (prefer function on SecopDevice)."""
# TODO: or NameError?
if module not in self._secnode.modules:
raise ValueError('no module %r found on this SEC node' % module)
if parameter not in self._secnode.modules[module]['parameters']:
raise ValueError('no parameter %r found on module %r of this SEC node'
% (parameter, module))
try:
self._custom_callbacks[(module, parameter)].append(f)
self._secnode.register_callback((module, parameter), updateItem=f)
except ValueError as e:
raise ValueError('function not registered as callback!') from e
self.log.debug('removed callback %r from %s:%s', f.__name__,
module, parameter)
def filter_parameters(self, module, parameters):
"""filter given parameters by their existence in given module"""
return [p for p in parameters if (module, p) in self._secnode.identifier]
class SecopDevice(Device):
"""Represent a SECoP module.
Has a status (for the connection and errors on parameters)
but no value, therefore not based on Readable
"""
attached_devices = {
'secnode': Attach('sec node', SecNodeDevice),
}
parameters = {
'secop_module': Param('SECoP module', type=str, settable=False,
userparam=False),
'secop_properties': Param('SECoP module properties',
type=anytype, settable=False,
userparam=False),
'mixins': Param('Mixins to add to this Device', type=listof(str),
settable=False, userparam=False, default=[]),
}
_defunct = False
_cache = None
_inside_read = False
# overridden by a dict for the validators of 'value', 'status' and 'target':
_maintypes = ()
@classmethod
def makeDevClass(cls, name, **config):
"""create a class with the needed doRead/doWrite methods
for accessing the assigned SECoP module
"""
secnodedev = session.getDevice(config['secnode'])
# get config from SECNode
setup_info = secnodedev.get_setup_info()
# The key in setup_info is the mapped/prefixed name a device gets when
# dynamically created. With static creation, this might differ from
# the 'name' argument.
dynamic_devname = secnodedev._get_device_name(config['secop_module'])
seccfg = setup_info[dynamic_devname][1]
# configuration must override SECoP description
devcfg = merge_dicts(seccfg, config)
add_mixins = devcfg.pop('mixins', [])
# create parameters and methods
parameters = {}
# validators of special/pseudo parameters
maintypes = {'value': anytype, 'status': tuple, 'target': anytype}
attrs = dict(parameters=parameters, _maintypes=maintypes,
__module__=cls.__module__, __qualname__=cls.__qualname__)
if 'value_datainfo' in devcfg:
maintypes['value'] = get_validator(
devcfg.pop('value_datainfo'), use_limits=False)
if 'target_datainfo' in devcfg:
attrs['valuetype'] = maintypes['target'] = get_validator(
devcfg.pop('target_datainfo'), use_limits=True)
for pname, kwargs in devcfg['params_cfg'].items():
typ = get_validator(kwargs.pop('datainfo'),
use_limits=kwargs.get('settable', False))
if 'fmtstr' not in kwargs and (typ is float or
isinstance(typ, floatrange)):
# the fmtstr default differs in SECoP and NICOS
# copy kwargs as it may be read only
kwargs = dict(kwargs, fmtstr='%g')
parameters[pname] = Param(volatile=True, type=typ, **kwargs)
if pname == 'target':
continue # special treatment of target in doReadTarget
def do_read(self, pname=pname):
return self._read(pname, None)
attrs['doRead%s' % pname.title()] = do_read
if kwargs.get('settable', False):
def do_write(self, value, pname=pname):
if not self._sim_intercept:
return self._write(pname, value)
attrs['doWrite%s' % pname.title()] = do_write
for cname, cmddict in devcfg['commands_cfg'].items():
if cname == 'reset':
continue # special treatment of reset command in doReset
def makecmd(cname, datainfo, description):
argument = datainfo.get('argument')
validate_result = get_validator(
datainfo.get('result'), use_limits=False)
if argument is None:
help_arglist = ''
def cmd(self):
return validate_result(self._call(cname, None))
elif argument['type'] == 'tuple':
# treat tuple elements as separate arguments
help_arglist = ', '.join(
'<%s>' % type_name(get_validator(t, use_limits=False))
for t in argument['members'])
def cmd(self, *args):
return validate_result(self._call(cname, args))
elif argument['type'] == 'struct':
# treat SECoP struct as keyword arguments
# positional args will be treated in the given order
# which is not guaranteed to be kept in SECoP
optional = datainfo.get('optional')
keys = list(argument['members'])
if optional is None:
optional = list(keys)
for key in optional:
keys.remove(key)
help_arglist = ', '.join(keys + ['%s=None' % k
for k in optional])
keys.extend(optional)
def cmd(self, *args, **kwds):
if len(args) > len(keys):
raise ValueError('too many arguments')
for arg, key in zip(args, keys):
if key in kwds:
raise ValueError(
'got multiple values for argument %r'
% key)
kwds[key] = arg
return validate_result(self._call(cname, kwds))
else:
help_arglist = '<%s>' % type_name(
get_validator(argument, use_limits=False))
def cmd(self, arg):
return validate_result(self._call(cname, arg))
cmd.help_arglist = help_arglist
cmd.__doc__ = description
cmd.__name__ = cname
return cmd
old = getattr(cls, cname, None)
if old is None:
attrs[cname] = usermethod(makecmd(cname, **cmddict))
elif cname != 'stop':
# stop is handled separately, do not complain
session.log.warning(
'skip command %s, as it would overwrite method of %r',
cname, cls.__name__)
# see if secop_properties exists (the case when auto-created from the
# SecNode) or not, where we need to get it from setup_info.
if 'secop_properties' not in devcfg:
devname = secnodedev._get_device_name(devcfg['secop_module'])
devcfg['secop_properties'] = secnodedev.setup_info[devname][1].get(
'secop_properties', {}
)
classname = cls.__name__ + '_' + name
# create a new class extending SecopDevice, apply DeviceMeta in order
# to include the added parameters
features = devcfg['secop_properties'].get('features', [])
mixins = [importString(mixin) for mixin in add_mixins]
mixins.extend([FEATURES[f] for f in features if f in FEATURES])
given_limits_params = set(devcfg['params_cfg']) & {
'target_limits', 'target_min', 'target_max'}
param_overrides = {}
if given_limits_params:
param_overrides.update((k, Override(userparam=False)) for k in given_limits_params)
mixins.append(SecopHasLimits)
if mixins:
# create class to hold access methods
newclass = DeviceMeta.__new__(
DeviceMeta, classname + '_base', (cls,), attrs)
mixcls = mixins[-1]
# create class with mixins, with methods overriding access methods
mixins.append(newclass)
add_attrs = dict(__module__=mixcls.__module__, __qualname__=mixcls.__qualname__)
if param_overrides:
add_attrs['parameter_overrides'] = param_overrides
newclass = DeviceMeta.__new__(
DeviceMeta, classname, tuple(mixins), add_attrs)
else:
newclass = DeviceMeta.__new__(
DeviceMeta, classname, (cls,), attrs)
newclass._modified_config = devcfg # store temporarily for __init__
return newclass
def __new__(cls, name, **config):
"""called when an instance of the class is created but before __init__
instead of returning a SecopDevice, we create an object of an extended
class here
"""
newclass = cls.makeDevClass(name, **config)
return Readable.__new__(newclass)
def __init__(self, name, **config):
"""apply modified config"""
self._attached_secnode = None
self._read_lock = RLock()
self._pending_updates = {}
self._param_errors = {}
Device.__init__(self, name, **self._modified_config)
del self.__class__._modified_config
def replaceClass(self, config):
"""replace the class on the fly
happens when the structure for the device has changed
"""
cls = class_from_interface(config['secop_properties'])
newclass = cls.makeDevClass(self.name, **config)
bad_attached = False
for dev, cls in get_attaching_devices(self):
if issubclass(newclass, cls):
self.log.warning('reattach %s to %s', dev.name, self.name)
else:
self.log.error('can not attach %s to %s', dev.name, self.name)
bad_attached = True
if bad_attached:
raise ConfigurationError('device class mismatch')
for dev, cls in get_aliases(self):
if issubclass(newclass, cls):
if dev.alias != self.name:
self.log.warning('redirect alias %s to %s',
dev.name, self.name)
else:
self.log.error('release alias %s from %s', dev.name, self.name)
dev.alias = ''
self.__class__ = newclass
# as we do not go through self.__init__ again,
# we have to update self._config
self._config = dict((name.lower(), value)
for (name, value) in config.items())
for aname in self.attached_devices:
self._config.pop(aname, None)
def doPreinit(self, mode):
if mode != SIMULATION:
self._attached_secnode.registerDevice(self)
def _cache_update(self, parameter):
# trigger nicos cache update and trigger doRead<param>/doRead/doStatus
# respecting methods of mixins
try:
if parameter == 'status':
return self.status(0)
if parameter == 'value':
result = self.read(0)
else:
result = getattr(self, parameter) # trigger doRead<param>
if parameter in self._param_errors:
self._param_errors.pop(parameter)
self.status(0) # modifying _param_errors affects status
return result
except Exception as e:
if self._cache:
self._cache.invalidate(self, parameter)
try:
param_item = self._attached_secnode._secnode.cache[
self.secop_module, parameter]
if not param_item.readerror: # the error is not handled yet
# this may happen when a value of a settable parameter
# is updated to a value outside the range
param_item.readerror = e
except Exception:
pass
if not isinstance(e, ReadFailedError):
# The SECoP spec states "ReadFailed" indicates "parameter can
# not be read _just now_". We assume this error happens not
# on a hardware failure (which would be HardwareError), but in
# a foreseeable way, depending on the state of the module
# -> do not worry the user
self._param_errors[parameter] = make_nicos_error(e)
self.status(0)
def _update(self, module, parameter, item):
if parameter not in self.parameters and parameter not in self._maintypes:
return
if self._inside_read:
# this thread is inside secnode.getParameter.
# indicate to the getter that a cache update has to be done:
self._pending_updates[parameter] = True
else:
with self._read_lock:
try:
# do not allow to call getParameter again
self._inside_read = True
self._cache_update(parameter)
finally:
self._inside_read = False
def _defunct_error(self):
if session.devices.get(self.name) == self:
return DefunctDevice('SECoP device %s no longer available'
% self.name)
return DefunctDevice('refers to a replaced defunct SECoP device %s'
% self.name)
@usermethod
def hwread(self, param='value', maxage=0):
"""read from hw or get from secnode cache, depending on maxage
:param param: a parameter, 'value' (default) or 'status'
:param maxage: do not read when SECoP timestamp is more recent than
indicated by maxage
:return: the validated value, after doRead<param> machinery
"""
if self._sim_intercept:
return self._cache.get(self, param, None)
try:
return self._read(param, maxage, True)
finally:
self._cache_update(param)
def _read(self, param, maxage=0, fromhw=True):
"""read from hw or get from secnode cache
:param param: a parameter name, 'value' or 'status'
:param maxage: do not read when SECoP timestamp is more recent than
indicated by maxage
:param fromhw: whether to read from HW or not
None: called from .status() or .read():
depends on secnode.async_only
:return: the validated value
"""
try:
validator = self._maintypes.get(param) or self.parameters[param].type
except KeyError:
raise KeyError(f'{param} is no parameter') from None
try:
secnode = self._attached_secnode._secnode
except AttributeError:
raise self._defunct_error() from None
if not secnode.online:
raise CommunicationError('no SECoP connection')
if maxage is None:
fromhw = False
elif fromhw is None and not self._attached_secnode.async_only:
fromhw = True
value, timestamp, readerror = secnode.cache[self.secop_module, param]
if fromhw and time.time() > (timestamp or 0) + maxage:
with self._read_lock:
if not self._inside_read:
# this thread is not yet inside calling getParameter
self._inside_read = True
try:
self._pending_updates = {}
value = secnode.getParameter(self.secop_module, param)[0]
finally:
todo = self._pending_updates
self._pending_updates = {}
self._inside_read = False
# updating the cache for pending updates
# respecting overridden methods from mixins
todo.pop(param, None) # this is handled by the caller
for pname in todo:
self._cache_update(pname)
if readerror:
raise make_nicos_error(readerror) from None
try:
return validator(value)
except Exception:
self.log.exception('can not set %r to %r', param, value)
raise
def _write(self, param, value):
try:
self._attached_secnode._secnode.setParameter(self.secop_module,
param, value)
return value
except AttributeError:
raise self._defunct_error() from None
def _call(self, cname, argument=None):
return self._attached_secnode._secnode.execCommand(
self.secop_module, cname, argument)[0]
def setDefunct(self):
if self._defunct:
self.log.warning('device is already defunct')
return
self._defunct = True
if self._attached_secnode is not None:
session.configured_devices.pop(self.name, None)
self._attached_secnode.unregisterDevice(self)
# make defunct
secnode = self._adevs.pop('secnode', None)
if secnode:
secnode._sdevs.discard(self._name)
self._attached_secnode = None
self._cache = None
self.updateStatus()
def setAlive(self, secnode):
self._defunct = False
self._cache = secnode._cache
self._attached_secnode = secnode
self._adevs.update({'secnode': secnode})
secnode._sdevs.add(self._name)
secnode.registerDevice(self)
# clear defunct status
self.updateStatus()
def doShutdown(self):
if not self._defunct:
self.setDefunct()
def connectionStatus(self):
if not self._attached_secnode:
return status.ERROR, 'defunct'
if not self._attached_secnode._secnode.online:
return status.ERROR, 'no SECoP connection'
return None
def paramWarning(self):
n = len(self._param_errors)
if n == 1:
param, exc = list(self._param_errors.items())[0]
return status.WARN, f'param {param}: {exc}'
return status.WARN, f'errors in {n} parameters: see {self.name}.showerrors()'
def doStatus(self, maxage=0):
st = self.connectionStatus()
if st:
return st
if self._param_errors:
return self.paramWarning()
return status.OK, ''
@usermethod
def status(self, maxage=None):
"""simple replacement for Readable.status
not inherited from SecopReadable. we have to implement it here,
as it is called in SecopDevice._cache_update
"""
if self._sim_intercept:
return status.OK, 'simulated ok'
return self._getFromCache('status', self.doStatus, maxage)
def showerrors(self):
for param, exc in self._param_errors.items():
prefix = '' if param in str(exc) else f'{param}: '
session.log.info('%s.%s%s: %s', self.name, prefix, get_exc_name(exc), exc)
def updateStatus(self):
"""status update without SECoP read in async_only mode"""
self._cache_update('status')
def setConnected(self, connected):
if connected:
# rebuild values cleared from cache
for param in self._attached_secnode.filter_parameters(
self.secop_module, self.parameters):
try:
getattr(self, param)
except Exception as e:
self._param_errors[param] = make_nicos_error(e)
try:
self.read(0)
except AttributeError:
pass
self._param_errors.clear() # clear errors from previous connection
else:
if self._cache:
for param in self._attached_secnode.filter_parameters(
self.secop_module, self._params):
self._cache.delete(self, param)
self._params.pop(param)
self._param_errors.clear()
self.updateStatus()
def register_callback(self, parameter, f):
"""Register a callback for parameter updates.
The function is executed every time the client receives a new value for
the given parameter.
"""
self._attached_secnode.register_custom_callback(self.secop_module,
parameter, f)
def unregister_callback(self, parameter, f):
"""Unregister a callback for parameter updates."""
self._attached_secnode.unregister_custom_callback(self.secop_module,
parameter, f)
STATUS_MAP = {
# division by 100: merge SECoP substates
StatusType.DISABLED // 100: status.DISABLED,
StatusType.IDLE // 100: status.OK,
StatusType.WARN // 100: status.WARN,
StatusType.BUSY // 100: status.BUSY,
StatusType.ERROR // 100: status.ERROR,
}
class SecopReadable(SecopDevice, Readable):
"""Represent a SECoP "Readable"."""
parameter_overrides = {
# do not force to give unit in setup file
# (take from SECoP description)
'unit': Override(default='', mandatory=False),
# pollinterval is unused as polling is done remotely
# may be overridden by the SECoP pollinterval
'pollinterval': Override(default=None, userparam=False, settable=False),
# calculated to be slightly higher than maxage on the attached SecNodeDevice
'maxage': Override(default=3600, userparam=False, settable=False, volatile=True),
}
def status(self, maxage=None):
# do not use SecopDevice.status here
return Readable.status(self, maxage)
def doRead(self, maxage=0):
try:
return self._read('value', maxage, None)
except NicosError:
st = self.doStatus(0)
if st[0] == status.DISABLED:
raise NicosError('disabled') from None
raise
def doReset(self):
try:
self._call('reset')
except KeyError:
try:
self.clear_errors()
except AttributeError:
pass
def doStatus(self, maxage=0):
st = self.connectionStatus()
if st:
return st
try:
code, text = self._read('status', maxage, None)
# code is a SECoP status code here (StatusType.<name> below)
except NicosError as e:
return status.ERROR, str(e)
if StatusType.FINALIZING <= code < StatusType.ERROR:
return status.OK, text
if self._param_errors and code != StatusType.DISABLED:
exc = self._param_errors.get('value')
if exc and code < StatusType.BUSY: # error reading value and status IDLE or WARN
return status.ERROR, f'can not read value: {get_exc_name(exc)} - {exc}'
if code < StatusType.WARN: # error in any other parameter and status IDLE
return self.paramWarning()
# treat SECoP code 401 (unknown) as error - should be distinct from
# NICOS status unknown
return STATUS_MAP.get(code // 100, status.UNKNOWN), text
def doReadMaxage(self):
sn = self._attached_secnode
# maxage should be a little above the value used in poll thread
return (sn.maxage or 600) + len(sn._polled_devs) * 2
def showerrors(self):
st = self.status()
if st[0] == status.ERROR:
session.log.info('%s.status(): %s', self.name, st[1])
super().showerrors()
def info(self):
# override the default NICOS behaviour here:
# a disabled SECoP module should be ignored silently
st = self.status(0)
if st[0] == status.DISABLED:
# do not display info in data file when disabled
return []
exc = self._param_errors.get('value')
if exc is not None:
# avoid calling read(), as it would fail
return [
DeviceMetaInfo(
'value',
DeviceParInfo(None, f'Error: {exc}', '', 'general')),
DeviceMetaInfo(
'status',
DeviceParInfo(st, formatStatus(st), '', 'status'))]
return Readable.info(self)
class SecopWritable(SecopReadable, Moveable):
"""Represent the SECoP "Writable"."""
def doReadTarget(self):
try:
return self._read('target', None)
except NicosError:
# this might happen when the target is not initialized
# if we do not catch here, Moveable.start will raise
# when comparing with the previous value
return None
def doStart(self, target):
try:
self._attached_secnode._secnode.setParameter(
self.secop_module, 'target', target)
except AttributeError:
raise self._defunct_error() from None
def _getWaiters(self):
"""do not return attached secnode"""
return {k: v for k, v in self._adevs.items() if k != 'secnode'}
class SecopMoveable(SecopWritable):
"""Represent the SECoP "Drivable"."""
def doStop(self):
if self.doStatus()[0] == status.BUSY:
try:
self._attached_secnode._secnode.execCommand(
self.secop_module, 'stop')
except Exception:
self.log.exception('error while stopping')
self.updateStatus()
class SecopAcqChannel(Measurable, SecopReadable):
"""equivalent to AcquisitionChannel"""
doStatus = SecopReadable.doStatus
_controller = None
_presetname = None
def doInit(self, _mode):
secnode = self._attached_secnode
cinfo = secnode._acq_controllers.pop(self.secop_module, None)
if cinfo:
# the controller is already initialized -> add channel to it
name, controller = cinfo
self.setController(name, controller)
controller.addChannel(name, self)
else:
secnode._acq_channels[self.name] = self
def setController(self, name, controller):
"""assign controller and remember preset name"""
self._presetname = name
self._controller = controller
def doSetPreset(self, **kwds):
self._controller.setAllPresets(**kwds)
self._lastpreset = kwds
def doPrepare(self):
try:
self._controller.performAction('prepare', 'prepare')
except KeyError:
pass # prepare is not implemented on the secnode
def doStart(self):
self._controller.performAction('start', 'go')
def doStop(self):
try:
self._controller.performAction('stop', 'stop')
except KeyError:
pass # stop is not implemented on the secnode
def doResume(self):
self._controller.performAction('resume', 'go')
def doFinish(self):
return self._controller.finishAction()
def presetInfo(self):
return [self._presetname]
valueInfo = Readable.valueInfo
class SecopAcqController(SecopDevice):
_channels = None
__lock = None
__action = None
def initChannels(self, channels=None):
self.__lock = RLock()
self._channels = {}
if not channels:
return # for SecopAcquisition
secnode = self._attached_secnode
for name, module in channels.items():
# collect already registered channels
dev = secnode._acq_channels.pop(module, None)
if dev is None:
secnode._acq_controllers[module] = (name, self)
else:
self.addChannel(name, dev)
dev.setController(name, self)
def doInit(self, _mode):
self.initChannels(self.secop_properties.get('acquisition_channels', {}))
def addChannel(self, name, channel):
self._channels[name] = channel
def setAllPresets(self, **kwds):
for name, dev in self._channels.items():
preset = kwds.get(name)
try:
if preset is None:
dev.goal_enable = False
else:
dev.goal = preset
dev.goal_enable = True
except (UsageError, AttributeError):
pass
def performAction(self, action, cmd):
"""execute <cmd> on secnode if <action> is not done already"""
with self.__lock:
# Channels and controllers forming an acquisition in SECoP
# need to be started, stopped etc. only once. We want to skip
# repeated actions on the controller triggered by the channels.
# As a useful sequence of 'prepare', 'start', 'pause', 'resume',
# 'stop', 'finish' for a single measurable does contain repeated
# actions, we can safely skip repeated actions here.
if action == self.__action:
return
self.__action = action
try:
self._attached_secnode._secnode.execCommand(self.secop_module, cmd)
except AttributeError:
raise self._defunct_error() from None
def finishAction(self):
# make sure that a start action can be repeated after finish
self.__action = 'finish'
class SecopAcquisition(SecopAcqChannel, SecopAcqController):
"""equivalent to SECoP Acquisition"""
_presetname = None
def doInit(self, _mode):
self._presetname = self.secop_properties.get('_acquisition_key', 'main')
self.log.debug('preset name %r', self._presetname)
self.initChannels()
self._controller = self
self._channels = {self._presetname: self}
class SecopHasOffset(HasOffset):
"""Modified HasOffset mixin.
goal: make the class to be accepted by the adjust command
"""
parameter_overrides = {
'offset': Override(prefercache=True, volatile=True),
}
def doRead(self, maxage=0):
return super().doRead() - self.offset
def doReadTarget(self):
raw = super().doReadTarget()
return None if raw is None else super().doReadTarget() - self.offset
def doStart(self, value):
super().doStart(value + self.offset)
def doWriteOffset(self, value):
super().doWriteOffset(value)
# remark: possible adjustments of limits, targets have to be done
# in the remote implementation
self._write('offset', value)
class SecopHasLimits(HasLimits):
"""treat target_max and/or target_min
accept also target_limits (intermediate draft spec)
"""
parameter_overrides = {
'abslimits': Override(prefercache=True, mandatory=False,
volatile=True),
'userlimits': Override(volatile=True),
}
def doReadAbslimits(self):
dt = self._config.get('target_datainfo', {})
return dt.get('min', float('-inf')), dt.get('max', float('inf'))
def doReadUserlimits(self):
if hasattr(self, 'target_limits'):
min_, max_ = self.target_limits
else:
min_ = getattr(self, 'target_min', float('-inf'))
max_ = getattr(self, 'target_max', float('inf'))
offset = self.offset if isinstance(self, SecopHasOffset) else 0
return min_ - offset, max_ - offset
def _adjustLimitsToOffset(self, value, diff):
"""not needed, as limits are calculated from SECoP parameters"""
def doWriteUserlimits(self, value):
super().doWriteUserlimits(value)
offset = self.offset if isinstance(self, SecopHasOffset) else 0
min_ = value[0] + offset
max_ = value[1] + offset
if hasattr(self, 'target_limits'):
self.target_limits = min_, max_
if hasattr(self, 'target_min'):
self.target_min = min_
else: # silently replace with abslimits
min_ = self.abslimits[0]
if hasattr(self, 'target_max'):
self.target_max = max_
else:
max_ = self.abslimits[1]
return min_ - offset, max_ - offset
IF_CLASSES = {
'Drivable': SecopMoveable,
'Writable': SecopWritable,
'Readable': SecopReadable,
'Module': SecopDevice,
'AcquisitionController': SecopAcqController,
'AcquisitionChannel': SecopAcqChannel,
'Acquisition': SecopAcquisition,
}
ALL_IF_CLASSES = set(IF_CLASSES.values())
FEATURES = {
'HasOffset': SecopHasOffset,
}