diff --git a/devices.py b/devices.py index 9c69a6d..04eb565 100644 --- a/devices.py +++ b/devices.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # ***************************************************************************** # NICOS, the Networked Instrument Control System of the MLZ # Copyright (c) 2009-2018 by the NICOS contributors (see AUTHORS) @@ -30,14 +29,14 @@ connected to a SEC node import threading from nicos import config, session from nicos.core import Override, Param, Moveable, status, POLLER, SIMULATION, DeviceAlias, \ - Device, anytype, listof -from nicos.devices.secop.devices import SecNodeDevice, NicosSecopClient -from nicos.core.utils import USER, User, createThread -from nicos.services.daemon.script import RequestError, ScriptRequest + Device, anytype, listof, MASTER +from nicos.devices.secop.devices import SecNodeDevice +from nicos.core.utils import createThread from nicos.utils.comparestrings import compare from nicos.devices.secop.devices import get_attaching_devices from nicos.commands.basic import AddSetup, CreateAllDevices, CreateDevice -from servicemanager import FrappyManager, SeaManager +from nicos.utils import loggers +from servicemanager import FrappyManager, SeaManager, Reconnect, Keep SERVICES = FrappyManager.services @@ -57,7 +56,7 @@ def applyAliasConfig(): """ # reimplemented from Session.applyAliasConfig # apply also when target dev name does not change, as the target device might have - # be exchanged in the mean time + # be exchanged in the meantime for aliasname, targets in session.alias_config.items(): if aliasname not in session.devices: continue # silently ignore @@ -92,6 +91,8 @@ def all_info(all_cfg, prefix='currently configured: '): if cfglist is None: addkwd = True else: + # if cfglist is True: + # cfglist = ['reconnect'] if isinstance(cfglist, str): cfglist = [cfglist] cfginfo = ','.join(c if isinstance(c, str) @@ -108,7 +109,7 @@ def get_frappy_config(): try: return session.devices['frappy'] except KeyError: - session.log.error("the frappy device is not available - 'frappy' setup is not loaded") + session.log.exception("the frappy device is not available - 'frappy' setup is not loaded") return None @@ -135,71 +136,124 @@ class FrappyConfig(Device): type=listof(str), default=[]), } - meanings = list(parameters) - meanings.remove('nodes') - _trigger_change = None + meanings = [n for n, p in parameters.items() if p.type is anytype] + _update_setup = 'on_frappy' _previous_shown = None _previous_state = None - _initial_config = None - _servers_loaded = False + _initial_info = None + _shutdown_event = None + _restarting = False + _within_update_setup = False def doInit(self, mode): if mode != SIMULATION and session.sessiontype != POLLER: - self._trigger_change = threading.Event() + self._shutdown_event = threading.Event() for name in self.nodes: secnode = session.devices.get(name) if secnode: secnode.uri = '' - createThread('frappy change notification', self.handle_notifications) + createThread('check frappy and sea servers', self.detect_changes) - def handle_notifications(self): - controller = session.daemon_device._controller - while True: - # we do not wait for ever here, because there might be changes - # on an unconnected service - self._trigger_change.wait(15) - self._trigger_change.clear() - while self._trigger_change.wait(2): # triggered again within 2 sec - self._trigger_change.clear() - try: - cfgs = self.check_services() - changes, state, remarks = self.to_consider(cfgs) - if state != self._previous_state and changes: - self._previous_state = state - cmd = 'frappy.has_changed() # inserted automatically when frappy or sea servers changed' - controller.new_request(ScriptRequest(cmd, None, User('guest', USER))) - except RequestError as e: - session.log.error(f'can not queue request {e!r}') + def doShutdown(self): + self._shutdown_event.set() - def to_consider(self, cfgs): - """return info from sea and frappy servers - - for a potential "please consider calling frappy(...)" message + def detect_changes(self): + before_check = before_change = prev_shown = None + cnt = 0 + while not self._shutdown_event.wait(1): + busy = bool(session.experiment.scripts or session.daemon_device._controller.queue.scripts) + if busy and cnt < 10: + # check only every 10 sec when busy + cnt += 1 + continue + cnt = 0 + need_change, changes, fm = self.to_consider() + if fm.state == before_change or not need_change: + continue + if fm.state != before_check: + # must be twice the same + before_check = fm.state + continue + if busy or fm.state == prev_shown: + continue + self.show_changes(changes) + prev_shown = fm.state + + def show_changes(self, changes): + if changes is None: + need_change, changes, fm = self.to_consider() + session.log.warning('sample environment has changed:') + args = [] + for service, cfg in changes.items(): + arg = str(cfg) + if cfg: + if isinstance(cfg, Keep): + arg = f'={cfg}' + session.log.info('keep %s: %s', service, cfg) + else: + session.log.warning('change %s to %s', service, cfg) + elif cfg == '': + session.log.warning('remove %s', service) + args.append(arg) + args = tuple(args) + session.log.warning('use frappy%r or frappy.update() to activate', args) + + def to_consider(self, cfgs=None): + """return info about a proposed changes + + :param cfgs: previous configuration + :return: <'need change' flag>, , + + the information takes into account running frappy and sea servers + and their configuration """ - error, proposed, state, remarks = FrappyManager().get_server_state(config.instrument, cfgs) + current_cfgs, target_cfgs = self.check_services() + if cfgs is None: + cfgs = current_cfgs + self._current_cfgs = cfgs + fm = FrappyManager() + proposed = fm.get_server_state(config.instrument, cfgs) changes = dict(proposed) - for service, guess in proposed.items(): - if guess is True: - changes.pop(service) - disconnected = set() - for service, info in cfgs.items(): - if info == '': - disconnected.add(service) + for service, cfg in proposed.items(): + if cfg == 'reconnect': # means: server is running, no restart needed + if service in current_cfgs: + changes.pop(service) + need_change = False + for service in SERVICES: + cfg = changes.get(service) # proposed cfg + info = cfgs.get(service) # running cfg + if not info: + if cfg == '': + changes.pop(service) + cfg = None + if target_cfgs.get(service): + need_change = True + elif info == '': if not changes.get(service): changes[service] = '' - return changes, (proposed,) + state, remarks + need_change = True + cfg = '' + if cfg and not isinstance(cfg, Keep): + need_change = True + # elif target_cfgs.get(service) != cfgs.get(service): + # need_change = True + return need_change, changes, fm def check_services(self): cfgs = {} + targets = {} for secnodename in self.nodes: secnode = session.devices.get(secnodename) if secnode: cfgs[secnode.service] = secnode.get_info() - return cfgs + targets[secnode.service] = secnode.target + return cfgs, targets - def start_services(self, main=None, stick=None, addons=None): + def start_services(self, main=None, stick=None, addons=None,): """start/stop frappy servers + :param main, stick, addons: cfg for frappy servers, '' to stop, None to keep + for example: start_services(main='xy', stick='') - restart main server with cfg='xy' - stop stick server @@ -208,67 +262,71 @@ class FrappyConfig(Device): in addition, if a newly given cfg is already used on a running server, this cfg is removed from the server (remark: cfg might be a comma separated list) """ - services = {'main': main, 'stick': stick, 'addons': addons} - for service, cfg in services.items(): - if cfg == '': - seaconn = session.devices.get(f'se_sea_{service}') - if seaconn and seaconn._attached_secnode: - try: - seaconn.communicate('frappy_remove %s' % service) - except Exception: - pass - used_cfg = {} - all_cfg = {} - new_cfg = {} - secnodes = {} - remove_cfg = [] - for service, cfginfo in services.items(): - secnodes[service] = secnode = session.devices.get('se_' + service) - chkinfo = '' - if secnode: - all_cfg[service] = chkinfo = secnode.get_info() - if cfginfo is not None: - new_cfg[service] = chkinfo = cfginfo - - # check cfg is not used twice - for cfg in chkinfo.split(','): - cfg = cfg.strip() - if cfg and cfg != 'restart': - prev = used_cfg.get(cfg) - if prev: - raise ValueError('%r can not be used in both %s and %s' % (cfg, prev, service)) - used_cfg[cfg] = service - - for service, cfginfo in reversed(list(new_cfg.items())): - if cfginfo != all_cfg.get(service, ''): - secnode = secnodes[service] + self._restarting = True + try: + services = {'main': main, 'stick': stick, 'addons': addons} + to_reconnect = {} + for service, cfg in services.items(): + if cfg == '': + seaconn = session.devices.get(f'se_sea_{service}') + if seaconn and seaconn._attached_secnode: + try: + seaconn.communicate('frappy_remove %s' % service) + except Exception: + pass + used_cfg = {} + all_cfg = {} + new_cfg = {} + secnodes = {} + remove_cfg = [] + for service, cfginfo in services.items(): + secnodes[service] = secnode = session.devices.get('se_' + service) + chkinfo = '' if secnode: - secnode('') # stop previous frappy server + all_cfg[service] = chkinfo = secnode.get_info() + if cfginfo is not None and (cfginfo != chkinfo or not isinstance(cfginfo, Reconnect)): + new_cfg[service] = chkinfo = cfginfo + if secnode: + to_reconnect[service] = secnode - if new_cfg: - for service, cfginfo in new_cfg.items(): - nodename = 'se_' + service - secnode = secnodes[service] - prev = all_cfg.get(service) - if cfginfo != prev: - if cfginfo == 'restart': - cfginfo = prev - if not cfginfo: - continue + # check cfg is not used twice + for cfg in chkinfo.split(','): + cfg = cfg.strip() + if cfg and cfg != 'restart': + prev = used_cfg.get(cfg) + if prev: + raise ValueError('%r can not be used in both %s and %s' % (cfg, prev, service)) + used_cfg[cfg] = service + + for service, cfginfo in reversed(list(new_cfg.items())): + if cfginfo != all_cfg.get(service, ''): + secnode = secnodes[service] + if secnode: + secnode('') # stop previous frappy server + + if new_cfg: + for service, cfginfo in new_cfg.items(): + nodename = 'se_' + service + secnode = secnodes[service] + prev = all_cfg.get(service) if not secnode: if not cfginfo: continue AddSetup('frappy_' + service) secnode = session.devices[nodename] secnode(cfginfo) + to_reconnect.pop(service, None) all_cfg[service] = secnode.get_info() CreateDevice(nodename) - cleanup_defunct() - CreateAllDevices() - self.set_envlist() - for secnode in remove_cfg: - secnode.disable() - self._cache.put(self, 'config', all_cfg) + cleanup_defunct() + CreateAllDevices() + for secnode in to_reconnect.values(): + secnode._secnode.connect() + self.set_envlist() + for secnode in remove_cfg: + secnode.disable() + finally: + self._restarting = False return all_cfg def __call__(self, *args, main=None, stick=None, addons=None, force=False): @@ -280,24 +338,29 @@ class FrappyConfig(Device): - addons are not changed when not given - frappy(main='') # main cfg is changed, but stick is kept - frappy('restart') # restart all frappy servers - - frappy(stick='restart') # restart stick frappy server + - frappy('reconnect') # reconnect to running frappy servers """ stickarg = stick - confirmed = SeaManager().get_cfg(config.instrument, 'sea', True).split('/', 1)[0] + + _, changes, fm = to_consider = self.to_consider() + confirmed = fm.sea.get_cfg(config.instrument, 'sea', True).split('/', 1)[0] if args: if main is not None: raise TypeError('got multiple values for main') main = args[0] if len(args) == 1: # special case: main given as single argument if main == 'restart': - stick = 'restart' - addons = 'restart' + main = self._current_cfgs.get('main') + stick = self._current_cfgs.get('stick') + addons = self._current_cfgs.get('addons') + elif main == 'reconnect': + main = None elif stick is None: # auto stick if main == '': stick = '' # remove stick with main else: stickcfg = main + 'stick' - if FrappyManager().is_cfg(config.instrument, 'stick', stickcfg): + if fm.is_cfg(config.instrument, 'stick', stickcfg): # if a default stick is available, start this also stick = stickcfg else: @@ -311,7 +374,7 @@ class FrappyConfig(Device): raise TypeError('got multiple values for addons') addons = ','.join(alist) elif main is None and stick is None and addons is None: # bare frappy() command - self.show_config(self.check_services(), True) + self.show_config(None, True, to_consider=to_consider) return if confirmed and confirmed != main and main not in (None, 'restart') and not force: session.log.warning('%r is plugged to the cryostat control rack', confirmed) @@ -320,80 +383,53 @@ class FrappyConfig(Device): raise TypeError('refuse to override plugged device') self.show_config(self.start_services(main, stick, addons)) - def show_config(self, allcfg, show_server_state=False): - changes, state, remarks = self.to_consider(allcfg) + def show_config(self, allcfg, show_server_state=False, to_consider=None): + need_change, changes, fm = to_consider or self.to_consider(allcfg) if show_server_state == 'auto': - show_server_state = state != self._previous_shown + show_server_state = fm.state != self._previous_shown and need_change if show_server_state: - proposed, frappycfgs, seacfgs = state rows = [['server', 'frappy', 'sea', '']] - for key, remark in remarks.items(): + for key, remark in fm.remarks.items(): rows.append([key if key in ('main', 'stick') else 'addons', - frappycfgs.get(key, ''), seacfgs.get(key, ''), remark]) + fm.frappy_cfgs.get(key, ''), fm.sea_cfgs.get(key, ''), remark]) wid = [max(len(v) for v in column) for column in zip(*rows)] # insert title underlines rows.insert(1, ['-' * w for w in wid[:-1]] + ['']) for row in rows: session.log.info('%s', ' '.join(v.ljust(w) for w, v in zip(wid, row))) session.log.info('') - # remove 'frappy.has_changed()' commands in script queue - controller = session.daemon_device._controller - controller.block_requests(r['reqid'] for r in controller.get_queue() - if r['script'].startswith('frappy.has_changed()')) - self._previous_state = self._previous_shown = state - session.log.info(all_info(allcfg)) + self._previous_state = self._previous_shown = fm.state + session.log.info(all_info(self._current_cfgs)) - if changes: - info = all_info(changes, 'proposed cfg changes: ') - session.log.info(info) - session.log.warning('please consider to call: frappy.update() for doing above changes') - if '?' in info: - session.log.warning("but create cfg files first for items marked with '?'") + if need_change: + self.show_changes(changes) - def update(self): - changes = self.to_consider(self.check_services())[0] + def update(self, *args): + if args: + changes = {k: (Keep(v[1:]) if v.startswith('=') else v) for k, v in zip(SERVICES, args) if v is not None} + else: + changes = self.to_consider()[1] self.show_config(self.start_services(**changes)) - def initial_restart_cfg(self, service): - """get cfg for (re)start of the service - - returns: - cfg, when the server has to (re)started with a new cfg - True, when the server is running and does not need a restart - None, when the server is not running, but does not need a restart - """ - if self._servers_loaded: - return None - if self._initial_config is None: + def get_init_info(self, service): + """check whether a connect of this service is required""" + if self._initial_info is None: # we do this only once for all services fm = FrappyManager() running = fm.get_cfg(config.instrument, None) cache = self._getCache() cfgs = {} for serv, secnode in zip(fm.services, self.nodes): - cfg = running.get(serv) - if not cfg and cache: - cfg = cache.get(secnode, 'value') - if cfg: - cfgs[serv] = cfg - running_main = running.get('main') - if running_main and running_main != cfgs.get('main'): - # new main device: clear old stick - running_stick = running.get('stick') - if running_stick: - cfgs['stick'] = running_stick - else: - cfgs.pop('stick', None) - error, proposed, state, remarks = fm.get_server_state(config.instrument, cfgs) - self._initial_config = proposed - if not error: + if cache: + cfg = cache.get(secnode, 'previous_config', '') + if cfg: + cfgs[serv] = cfg + self._initial_info = {s: (cfgs.get(s), running.get(s)) for s in fm.services} + fm.get_server_state(config.instrument, cfgs) + if not fm.error: # do not show server state on startup - self._previous_state = self._previous_shown = state - return self._initial_config.get(service) - - def has_changed(self, show_server_state='auto'): - self._servers_loaded = True - self.show_config(self.check_services(), show_server_state) + self._previous_state = self._previous_shown = fm.state + return self._initial_info[service] def remove_aliases(self): for meaning in self.meanings: @@ -485,6 +521,9 @@ class FrappyConfig(Device): session.log.debug('change alias %r -> %r', aliasname, devname) else: session.log.debug('create alias %r -> %r', aliasname, devname) + session.cache.put(aliasname, 'visibility', dev.visibility) + session.cache.put(aliasname, 'loglevel', dev.loglevel) + session.cache.put(aliasname, 'description', dev.description) aliasdev = session.createDevice(aliasname, recreate=True, explicit=True) aliasdev.alias = devname if aliasnames: @@ -532,10 +571,13 @@ class FrappyNode(SecNodeDevice, Moveable): 'param_category': Param("category of parameters\n\n" "set to 'general' if all parameters should appear in the datafile header", type=str, default='', settable=True), + 'quiet_init': Param('flag to set loglevel to error while initializing', + type=bool, default=True, settable=True) } _cfgvalue = None _lastcfg = None + __log_recording = () def doStart(self, value): if value == 'None': @@ -548,16 +590,17 @@ class FrappyNode(SecNodeDevice, Moveable): else: fc = session.devices.get('frappy') if fc: - cfg = fc.initial_restart_cfg(self.service) - if isinstance(cfg, str): # may also be None or True - self.restart(cfg) - if cfg is None: # None means: server is not running, and does not need to be restarted - self._disconnect() + cfg, running = fc.get_init_info(self.service) + self._cfgvalue = running + self._setROParam('target', cfg) + if cfg and (':' not in cfg and cfg != running): + self._set_status(status.ERROR, 'cfg changed') return - try: - self._connect() - except Exception: - pass + if self.uri: + try: + self._connect() + except Exception: + pass def doStop(self): """never busy""" @@ -568,10 +611,6 @@ class FrappyNode(SecNodeDevice, Moveable): def createDevices(self): cfg = self.read() super().createDevices() - if cfg != self._lastcfg: - fc = get_frappy_config() - if fc: - fc._trigger_change.set() if self.param_category: for devname, (_, devcfg) in self.setup_info.items(): params_cfg = devcfg['params_cfg'] @@ -581,21 +620,59 @@ class FrappyNode(SecNodeDevice, Moveable): if not pinfo.category: pinfo.category = self.param_category + def makeDynamicDevices(self, setup_info): + patched_loggers = [] + if self.quiet_init: + for devname, (_, devcfg) in setup_info.items(): + log = session.getLogger(devname) + if log not in patched_loggers: + result = [loggers.INFO] # default level + patched_loggers.append((log, result)) + log.setLevel(loggers.ERROR) + # avoid level change when the loglevel parameter is treated + # store level instead in result + log.__dict__['setLevel'] = result.append + try: + super().makeDynamicDevices(setup_info) + finally: + for log, result in patched_loggers: + log.__dict__.pop('setLevel', None) # re-enable setLevel + log.setLevel(result[-1]) # set to stored or default value + + def showInitLog(self): + for devname, record in self.__log_recording: + session.getLogger(devname).handle(record) + self.__log_recording = () + def nodeStateChange(self, online, state): - super().nodeStateChange(online, state) + print(f'NODE {self.service} {online} {state}') + # self.log.info('NODE %r %r', online, state) if online: + super().nodeStateChange(online, state) if self._cfgvalue is None: - self._cfgvalue = FrappyManager().get_cfg(config.instrument, self.service) - if not self._cfgvalue: + running_cfg = FrappyManager().get_cfg(config.instrument, self.service) + if running_cfg: + if running_cfg != self.target: + self.log.warning(f'server info {running_cfg!r} does not match target cfg {self.target!r}') + self._cfgvalue = running_cfg + else: self._cfgvalue = self.uri else: - self._cfgvalue = None - cfg = self.read() - if self._lastcfg != cfg: - self._lastcfg = cfg - fc = get_frappy_config() - if fc: - fc._trigger_change.set() + if self.target == self._cfgvalue: + # SecNodeDevice.nodeStateChange will change status to 'reconnecting' + super().nodeStateChange(online, state) + else: + # do not reconnect + self._cfgvalue = None + + def descriptiveDataChange(self, module, description): + running_cfg = FrappyManager().get_cfg(config.instrument, self.service) + if not running_cfg or running_cfg == self.target: + super().descriptiveDataChange(module, description) + else: + self._disconnect(True) + self._cfgvalue = running_cfg + self._set_status(status.ERROR, 'cfg changed') def disable(self): seaconn = session.devices.get('sea_%s' % self.service) @@ -615,47 +692,56 @@ class FrappyNode(SecNodeDevice, Moveable): """ if cfg is None: cfg = self._cfgvalue + if cfg is None: + self.log.error('can not restart - previous cfg unknown') + return ins = config.instrument fm = FrappyManager() info = fm.get_ins_info(ins) running_cfg = fm.get_cfg(ins, self.service) or '' - if cfg is None: - self.log.error('can not restart - previous cfg unknown') - return if cfg != running_cfg: self.disable() if running_cfg: self._disconnect() + session.log.info('stop frappy_%s %r %r', self.service, running_cfg, cfg) fm.do_stop(ins, self.service) - is_cfg = cfg and ':' not in cfg - if is_cfg: - available_cfg = None - for cfgitem in cfg.split(','): - if not fm.is_cfg(config.instrument, self.service, cfgitem): - if available_cfg is None: - available_cfg = fm.all_cfg(config.instrument, self.service) - suggestions = suggest(cfgitem, available_cfg) - if suggestions: - session.log.error('%s unknown, did you mean: %s' % (cfgitem, ', '.join(suggestions))) - if available_cfg is not None: - raise ValueError('use "frappy_list()" to get a list of valid frappy configurations') - uri = 'localhost:%d' % info[self.service] - else: - uri = cfg - if uri != self.uri: - self.uri = '' # disconnect - if uri: + try: + is_cfg = cfg and ':' not in cfg if is_cfg: - fm.do_start(ins, self.service, cfg, logger=self.log) - self.uri = uri # connect - self._cfgvalue = cfg - if self._cache: - self._cache.put(self, 'value', cfg) - self._setROParam('target', cfg) + if cfg == 'reconnect': + is_cfg = False + else: + available_cfg = None + for cfgitem in cfg.split(','): + if not fm.is_cfg(config.instrument, self.service, cfgitem): + if available_cfg is None: + available_cfg = fm.all_cfg(config.instrument, self.service) + suggestions = suggest(cfgitem, available_cfg) + if suggestions: + session.log.error('%s unknown, did you mean: %s' % (cfgitem, ', '.join(suggestions))) + if available_cfg is not None: + raise ValueError('use "frappy_list()" to get a list of valid frappy configurations') + uri = 'localhost:%d' % info[self.service] + else: + uri = cfg + if uri != self.uri: + self.uri = '' # disconnect + if uri: + if is_cfg: + session.log.info('start frappy_%s', self.service) + fm.do_start(ins, self.service, cfg, logger=self.log) + self.uri = uri # connect + self._cfgvalue = cfg + if self._cache: + self._cache.put(self, 'value', cfg) + self._setROParam('target', cfg) + finally: + self._cache.put(self, 'previous_config', self._cfgvalue or self.uri) - def _disconnect(self): + def _disconnect(self, keeptarget=False): super()._disconnect() - self._setROParam('target', '') + if not keeptarget: + self._setROParam('target', '') def get_info(self): result = self.doRead() or '' diff --git a/setups/frappy.py b/setups/frappy.py index c9006f7..a8345c8 100644 --- a/setups/frappy.py +++ b/setups/frappy.py @@ -18,7 +18,7 @@ devices = dict( # device, using the given importance number, with similar values as # given by the SECoP standard (10: instrument, 20: cryostat, 30: insert) temperature = { # the SECoP meaning - 'alias': ['Ts', 'temperature'], # the name(s) to be given to the alias + 'alias': 'Ts', # the name(s) to be given to the alias 'targets': # possible devices in addition with importance {'se_ts': 20, 'se_tt': 19, 'se_tm': 18}, }, @@ -28,7 +28,7 @@ devices = dict( 'drivable_only': True, }, magneticfield = { - 'alias': ['B', 'magfield'], + 'alias': 'B', 'targets': {'se_mf': 20}, }, pressure = { @@ -62,5 +62,4 @@ printinfo(" frappy('') # remove main SE apparatus") printinfo(" frappy() # show the current SE configuration") printinfo("=======================================================================================") set_se_list() -frappy.has_changed() '''