revert commits done before MZ holidays

they are all not neccessary for SINQ SE operation

Change-Id: Ic9adcccf685752ab90bb6b86005ac8e04b302855
This commit is contained in:
zolliker 2023-07-06 08:03:15 +02:00
parent 975593dd6b
commit d2885bdd72
23 changed files with 234 additions and 1236 deletions

View File

@ -53,8 +53,6 @@ disable=missing-docstring
,unidiomatic-typecheck
,undefined-loop-variable
,consider-using-f-string
,use-dict-literal
[REPORTS]
# Set the output format. Available formats are text, parseable, colorized, msvs

View File

@ -31,7 +31,6 @@ sys.path.insert(0, path.abspath(path.join(path.dirname(__file__), '..')))
import logging
from mlzlog import ColoredConsoleHandler
from frappy.gui.qt import QApplication
from frappy.gui.cfg_editor.mainwindow import MainWindow

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
# pylint: disable=invalid-name
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
@ -22,8 +23,8 @@
#
# *****************************************************************************
import argparse
import sys
import argparse
from os import path
# Add import path for inplace usage
@ -60,9 +61,8 @@ def parseArgv(argv):
action='store',
help="comma separated list of cfg files,\n"
"defaults to <name_of_the_instance>.\n"
"cfgfiles given without '.cfg' extension are searched"
" in the configuration directory,"
" else they are treated as path names",
"cfgfiles given without '.cfg' extension are searched in the configuration directory, "
"else they are treated as path names",
default=None)
parser.add_argument('-g',
'--gencfg',
@ -96,13 +96,15 @@ def main(argv=None):
generalConfig.init(args.gencfg)
logger.init(loglevel)
srv = Server(args.name, logger.log, cfgfiles=args.cfgfiles,
interface=args.port, testonly=args.test)
srv = Server(args.name, logger.log, cfgfiles=args.cfgfiles, interface=args.port, testonly=args.test)
if args.daemonize:
srv.start()
else:
srv.run()
try:
srv.run()
except KeyboardInterrupt:
pass
if __name__ == '__main__':

View File

@ -1,43 +0,0 @@
description = """
3He system in Lab ...
"""
Node('mlz_seop',
description,
'tcp://10767',
)
Mod('cell',
'frappy_mlz.seop.Cell',
'interface module to the driver',
config_directory = '/home/jcns/daemon/config',
)
Mod('afp',
'frappy_mlz.seop.Afp',
'controls the afp flip of the cell',
cell = 'cell'
)
Mod('nmr',
'frappy_mlz.seop.Nmr',
'controls the ',
cell = 'cell'
)
fitparams = [
('amplitude', 'V'),
('T1', 's'),
('T2', 's'),
('b', ''),
('frequency', 'Hz'),
('phase', 'deg'),
]
for param, unit in fitparams:
Mod(f'nmr_{param.lower()}',
'frappy_mlz.seop.FitParam',
f'fittet parameter {param} of NMR',
cell = 'cell',
value = Param(unit=unit),
sigma = Param(unit=unit),
param = param,
)

View File

@ -10,22 +10,6 @@ The needed fields are Equipment id (1st argument), description (this)
'tcp://10768',
)
Mod('attachtest',
'frappy_demo.test.WithAtt',
'test attached',
att = 'LN2',
)
Mod('pinata',
'frappy_demo.test.Pin',
'scan test',
)
Mod('recursive',
'frappy_demo.test.RecPin',
'scan test',
)
Mod('LN2',
'frappy_demo.test.LN2',
'random value between 0..100%',

46
debian/changelog vendored
View File

@ -1,49 +1,3 @@
frappy-core (0.17.13) focal; urgency=medium
[ Alexander Zaft ]
* add egg-info to gitignore
[ Markus Zolliker ]
* GUI bugfix: use isChecked instead of checkState in BoolInput
* frappy_psi.mercury/triton: add control_off command
* frappy_psi.phytron: rename reset_error to clear_errors
* frappy.mixins.HasOutputModule
* frappy_psi.mercury: proper handling of control_active
* add a hook for reads to be done initially
* frappy_psi.triton: fix HeaterOutput.limit
* frappy_psi.magfield: bug fix
* frappy_psi.sea: bug fixes
[ Alexander Zaft ]
* server: fix systemd variable scope
-- Alexander Zaft <jenkins@frm2.tum.de> Tue, 20 Jun 2023 14:38:00 +0200
frappy-core (0.17.12) focal; urgency=medium
[ Alexander Zaft ]
* Warn about duplicate module definitions in a file
* Add influences property to parameters/commands
[ Markus Zolliker ]
* frappy.client: dummy logger is missing 'exception' method
[ Alexander Zaft ]
* Add SEOP He3-polarization device
* Typo in influences description
* seop: fix fitparam command
[ Markus Zolliker ]
* silently catches error in systemd.daemon.notify
[ Alexander Zaft ]
* io: add option to retry first ident request
* config: fix merge_modules
* io: followup fix for retry-first-ident
* entangle: fix tango guards for pytango 9.3
-- Alexander Zaft <jenkins@frm2.tum.de> Tue, 13 Jun 2023 06:51:27 +0200
frappy-core (0.17.11) focal; urgency=medium
[ Alexander Zaft ]

View File

@ -1,41 +0,0 @@
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Alexander Zaft <a.zaft@fz-juelich.de>
#
# *****************************************************************************
from .core import Module
class Pinata(Module):
"""Base class for scanning conections and adding modules accordingly.
Like a piñata. You poke it, and modules fall out.
To use it, subclass it for your connection type and override the function
'scanModules'. For each module you want to register, you should yield the
modules name and its config options.
The connection will then be scanned during server startup.
"""
export = False
# POKE
def scanModules(self):
"""yield (modname, options) for each module the Pinata should create.
Options has to include keys for class and the config for the module.
"""
raise NotImplementedError

View File

@ -401,15 +401,10 @@ class UniqueObject:
def merge_status(*args):
"""merge status
for combining stati of different mixins
- the status with biggest code wins
- texts matching maximal code are joined with ', '
- if texts already contain ', ', it is considered as composed by
individual texts and duplication is avoided. when commas are used
for other purposes, the behaviour might be surprising
the status with biggest code wins
texts matching maximal code are joined with ', '
"""
maxcode = max(a[0] for a in args)
merged = [a[1] for a in args if a[0] == maxcode and a[1]]
# use dict instead of set for preserving order
merged = {m: True for mm in merged for m in mm.split(', ')}
return maxcode, ', '.join(merged)

View File

@ -329,6 +329,7 @@ class Module(HasAccessibles):
# reference to the dispatcher (used for sending async updates)
DISPATCHER = None
attachedModules = None
pollInfo = None
triggerPoll = None # trigger event for polls. used on io modules and modules without io
@ -346,9 +347,7 @@ class Module(HasAccessibles):
self.accessLock = threading.RLock() # for read_* / write_* methods
self.updateLock = threading.RLock() # for announceUpdate
self.polledModules = [] # modules polled by thread started in self.startModules
self.attachedModules = {}
errors = []
self._isinitialized = False
# handle module properties
# 1) make local copies of properties
@ -640,13 +639,6 @@ class Module(HasAccessibles):
all parameters are polled once
"""
def shutdownModule(self):
"""called when the sever shuts down
any cleanup-work should be performed here, like closing threads and
saving data.
"""
def doPoll(self):
"""polls important parameters like value and status
@ -940,12 +932,10 @@ class Attached(Property):
def __get__(self, obj, owner):
if obj is None:
return self
if self.name not in obj.attachedModules:
modobj = obj.DISPATCHER.get_module(super().__get__(obj, owner))
if not isinstance(modobj, self.basecls):
raise ConfigError(f'attached module {self.name}={modobj.name!r} '\
f'must inherit from {self.basecls.__qualname__!r}')
obj.attachedModules[self.name] = modobj
if obj.attachedModules is None:
# return the name of the module (called from Server on startup)
return super().__get__(obj, owner)
# return the module (called after startup)
return obj.attachedModules.get(self.name) # return None if not given
def copy(self):

View File

@ -540,6 +540,7 @@ class Limit(Parameter):
if self.hasDatatype():
return # the programmer is responsible that a given datatype is correct
postfix = self.name.rpartition('_')[-1]
postfix = self.name.rpartition('_')[-1]
if postfix == 'limits':
self.datatype = TupleOf(datatype, datatype)
self.default = (datatype.min, datatype.max)
@ -561,7 +562,6 @@ PREDEFINED_ACCESSIBLES = {
'unit': Parameter, # reserved name
'loglevel': Parameter, # reserved name
'mode': Parameter, # reserved name
'ctrlpars': Parameter, # spec to be confirmed
'stop': Command,
'reset': Command,
'go': Command,

View File

@ -39,18 +39,16 @@ Interface to the modules:
"""
import threading
import traceback
from collections import OrderedDict
from time import time as currenttime
from frappy.errors import NoSuchCommandError, NoSuchModuleError, \
NoSuchParameterError, ProtocolError, ReadOnlyError, ConfigError
NoSuchParameterError, ProtocolError, ReadOnlyError
from frappy.params import Parameter
from frappy.protocol.messages import COMMANDREPLY, DESCRIPTIONREPLY, \
DISABLEEVENTSREPLY, ENABLEEVENTSREPLY, ERRORPREFIX, EVENTREPLY, \
HEARTBEATREPLY, IDENTREPLY, IDENTREQUEST, READREPLY, WRITEREPLY, \
LOGGING_REPLY, LOG_EVENT
from frappy.lib import get_class
def make_update(modulename, pobj):
@ -86,13 +84,6 @@ class Dispatcher:
self.name = name
self.restart = srv.restart
self.shutdown = srv.shutdown
# handle to server
self.srv = srv
# set of modules that failed creation
self.failed_modules = set()
# list of errors that occured during initialization
self.errors = []
self.traceback_counter = 0
def broadcast_event(self, msg, reallyall=False):
"""broadcasts a msg to all active connections
@ -156,92 +147,11 @@ class Dispatcher:
self._export.append(modulename)
def get_module(self, modulename):
""" Returns a fully initialized module. Or None, if something went
wrong during instatiating/initializing the module."""
modobj = self.get_module_instance(modulename)
if modobj is None:
return None
if modobj._isinitialized:
return modobj
# also call earlyInit on the modules
self.log.info('initializing module %r', modulename) # TODO: change to debug
try:
modobj.earlyInit()
if not modobj.earlyInitDone:
self.errors.append(f'{modobj.earlyInit.__qualname__} was not called, probably missing super call')
modobj.initModule()
if not modobj.initModuleDone:
self.errors.append(f'{modobj.initModule.__qualname__} was not called, probably missing super call')
except Exception as e:
if self.traceback_counter == 0:
self.log.exception(traceback.format_exc())
self.traceback_counter += 1
self.errors.append(f'error initializing {modulename}: {e!r}')
modobj._isinitialized = True
self.log.info('initialized module %r', modulename) # TODO: change to debug
return modobj
def get_module_instance(self, modulename):
""" Returns the module in its current initialization state or creates a
new uninitialized modle to return.
When creating a new module, srv.module_config is accessed to get the
modules configuration.
"""
if modulename in self._modules:
return self._modules[modulename]
if modulename in list(self._modules.values()):
# it's actually already the module object
return modulename
# create module from srv.module_cfg, store and return
self.log.info('registering module %r', modulename)
opts = self.srv.module_cfg.get(modulename, None)
if opts is None:
raise NoSuchModuleError(f'Module {modulename!r} does not exist on this SEC-Node!')
pymodule = None
try: # pylint: disable=no-else-return
classname = opts.pop('cls')
if isinstance(classname, str):
pymodule = classname.rpartition('.')[0]
if pymodule in self.failed_modules:
# creation has failed already once, do not try again
return None
cls = get_class(classname)
else:
pymodule = classname.__module__
if pymodule in self.failed_modules:
# creation has failed already once, do not try again
return None
cls = classname
except Exception as e:
if str(e) == 'no such class':
self.errors.append(f'{classname} not found')
else:
self.failed_modules.add(pymodule)
if self.traceback_counter == 0:
self.log.exception(traceback.format_exc())
self.traceback_counter += 1
self.errors.append(f'error importing {classname}')
return None
else:
try:
modobj = cls(modulename, self.log.getChild(modulename), opts, self.srv)
except ConfigError as e:
self.errors.append(f'error creating module {modulename}:')
for errtxt in e.args[0] if isinstance(e.args[0], list) else [e.args[0]]:
self.errors.append(' ' + errtxt)
modobj = None
except Exception as e:
if self.traceback_counter == 0:
self.log.exception(traceback.format_exc())
self.traceback_counter += 1
self.errors.append(f'error creating {modulename}')
modobj = None
self.register_module(modobj, modulename, modobj.export)
self.srv.modules[modulename] = modobj # IS HERE THE CORRECT PLACE?
return modobj
raise NoSuchModuleError(f'Module {modulename!r} does not exist on this SEC-Node!')
def remove_module(self, modulename_or_obj):
moduleobj = self.get_module(modulename_or_obj)
@ -273,7 +183,6 @@ class Dispatcher:
def get_descriptive_data(self, specifier):
"""returns a python object which upon serialisation results in the descriptive data"""
specifier = specifier or ''
modules = {}
result = {'modules': modules}
for modulename in self._export:
@ -285,7 +194,7 @@ class Dispatcher:
mod_desc.update(module.exportProperties())
mod_desc.pop('export', False)
modules[modulename] = mod_desc
modname, _, pname = specifier.partition(':')
modname, _, pname = (specifier or '').partition(':')
if modname in modules: # extension to SECoP standard: description of a single module
result = modules[modname]
if pname in result['accessibles']: # extension to SECoP standard: description of a single accessible

View File

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
@ -23,16 +24,16 @@
"""Define helpers"""
import os
import signal
import sys
import traceback
from collections import OrderedDict
from frappy.config import load_config
from frappy.errors import ConfigError
from frappy.dynamic import Pinata
from frappy.lib import formatException, generalConfig, get_class, mkthread
from frappy.errors import ConfigError, SECoPError
from frappy.lib import formatException, get_class, generalConfig
from frappy.lib.multievent import MultiEvent
from frappy.params import PREDEFINED_ACCESSIBLES
from frappy.modules import Attached
from frappy.config import load_config
try:
from daemon import DaemonContext
@ -105,12 +106,6 @@ class Server:
self._cfgfiles = cfgfiles
self._pidfile = os.path.join(generalConfig.piddir, name + '.pid')
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, _num, _frame):
if hasattr(self, 'interface') and self.interface:
self.shutdown()
def start(self):
if not DaemonContext:
@ -132,18 +127,17 @@ class Server:
return f"{cls.__name__} class don't know how to handle option(s): {', '.join(options)}"
def restart_hook(self):
"""Actions to be done on restart. May be overridden by a subclass."""
pass
def run(self):
global systemd # pylint: disable=global-statement
while self._restart:
self._restart = False
try:
# TODO: make systemd notifications configurable
if systemd:
if systemd: # pylint: disable=used-before-assignment
systemd.daemon.notify("STATUS=initializing")
except Exception:
systemd = None
systemd = None # pylint: disable=redefined-outer-name
try:
self._processCfg()
if self._testonly:
@ -162,27 +156,13 @@ class Server:
self.log.info('startup done, handling transport messages')
if systemd:
systemd.daemon.notify("READY=1\nSTATUS=accepting requests")
t = mkthread(self.interface.serve_forever)
# we wait here on the thread finishing, which means we got a
# signal to shut down or an exception was raised
# TODO: get the exception (and re-raise?)
t.join()
self.interface = None # fine due to the semantics of 'with'
# server_close() called by 'with'
self.log.info(f'stopped listenning, cleaning up'
f' {len(self.modules)} modules')
# if systemd:
# if self._restart:
# systemd.daemon.notify('RELOADING=1')
# else:
# systemd.daemon.notify('STOPPING=1')
for name in self._getSortedModules():
self.modules[name].shutdownModule()
self.interface.serve_forever()
self.interface.server_close()
if self._restart:
self.restart_hook()
self.log.info('restarting')
self.log.info('shut down')
self.log.info('restart')
else:
self.log.info('shut down')
def restart(self):
if not self._restart:
@ -194,57 +174,85 @@ class Server:
self.interface.shutdown()
def _processCfg(self):
"""Processes the module configuration.
All modules specified in the config file and read recursively from
Pinata class Modules are instantiated, initialized and started by the
end of this function.
If there are errors that occur, they will be collected and emitted
together in the end.
"""
errors = []
opts = dict(self.node_cfg)
cls = get_class(opts.pop('cls'))
self.dispatcher = cls(opts.pop('name', self._cfgfiles),
self.log.getChild('dispatcher'), opts, self)
self.dispatcher = cls(opts.pop('name', self._cfgfiles), self.log.getChild('dispatcher'), opts, self)
if opts:
self.dispatcher.errors.append(self.unknown_options(cls, opts))
errors.append(self.unknown_options(cls, opts))
self.modules = OrderedDict()
failure_traceback = None # traceback for the first error
failed = set() # python modules failed to load
self.lastError = None
for modname, options in self.module_cfg.items():
opts = dict(options)
pymodule = None
try:
classname = opts.pop('cls')
pymodule = classname.rpartition('.')[0]
if pymodule in failed:
continue
cls = get_class(classname)
except Exception as e:
if str(e) == 'no such class':
errors.append(f'{classname} not found')
else:
failed.add(pymodule)
if failure_traceback is None:
failure_traceback = traceback.format_exc()
errors.append(f'error importing {classname}')
else:
try:
modobj = cls(modname, self.log.getChild(modname), opts, self)
self.modules[modname] = modobj
except ConfigError as e:
errors.append(f'error creating module {modname}:')
for errtxt in e.args[0] if isinstance(e.args[0], list) else [e.args[0]]:
errors.append(' ' + errtxt)
except Exception:
if failure_traceback is None:
failure_traceback = traceback.format_exc()
errors.append(f'error creating {modname}')
# create and initialize modules
todos = list(self.module_cfg.items())
while todos:
modname, options = todos.pop(0)
if modname in self.modules:
# already created by Dispatcher (via Attached)
continue
# For Pinata modules: we need to access this in Dispatcher.get_module
self.module_cfg[modname] = dict(options)
modobj = self.dispatcher.get_module_instance(modname) # lazy
if modobj is None:
self.log.debug('Module %s returned None', modname)
continue
self.modules[modname] = modobj
if isinstance(modobj, Pinata):
# scan for dynamic devices
pinata = self.dispatcher.get_module(modname)
pinata_modules = list(pinata.scanModules())
for name, _cfg in pinata_modules:
if name in self.module_cfg:
self.log.error('Module %s, from pinata %s, already'
' exists in config file!', name, modname)
self.log.info('Pinata %s found %d modules', modname, len(pinata_modules))
todos.extend(pinata_modules)
missing_super = set()
# all objs created, now start them up and interconnect
for modname, modobj in self.modules.items():
self.log.info('registering module %r', modname)
self.dispatcher.register_module(modobj, modname, modobj.export)
# also call earlyInit on the modules
modobj.earlyInit()
if not modobj.earlyInitDone:
missing_super.add(f'{modobj.earlyInit.__qualname__} was not called, probably missing super call')
# initialize all modules by getting them with Dispatcher.get_module,
# which is done in the get_descriptive data
# TODO: caching, to not make this extra work
self.dispatcher.get_descriptive_data('')
# =========== All modules are initialized ===========
# handle attached modules
for modname, modobj in self.modules.items():
attached_modules = {}
for propname, propobj in modobj.propertyDict.items():
if isinstance(propobj, Attached):
try:
attname = getattr(modobj, propname)
if attname: # attached module specified in cfg file
attobj = self.dispatcher.get_module(attname)
if isinstance(attobj, propobj.basecls):
attached_modules[propname] = attobj
else:
errors.append(f'attached module {propname}={attname!r} '\
f'must inherit from {propobj.basecls.__qualname__!r}')
except SECoPError as e:
errors.append(f'module {modname}, attached {propname}: {str(e)}')
modobj.attachedModules = attached_modules
# all errors from initialization process
errors = self.dispatcher.errors
# call init on each module after registering all
for modname, modobj in self.modules.items():
try:
modobj.initModule()
if not modobj.initModuleDone:
missing_super.add(f'{modobj.initModule.__qualname__} was not called, probably missing super call')
except Exception as e:
if failure_traceback is None:
failure_traceback = traceback.format_exc()
errors.append(f'error initializing {modname}: {e!r}')
if not self._testonly:
start_events = MultiEvent(default_timeout=30)
@ -253,7 +261,8 @@ class Server:
start_events.name = f'module {modname}'
modobj.startModule(start_events)
if not modobj.startModuleDone:
errors.append(f'{modobj.startModule.__qualname__} was not called, probably missing super call')
missing_super.add(f'{modobj.startModule.__qualname__} was not called, probably missing super call')
errors.extend(missing_super)
if errors:
for errtxt in errors:
@ -262,6 +271,8 @@ class Server:
# print a list of config errors to stderr
sys.stderr.write('\n'.join(errors))
sys.stderr.write('\n')
if failure_traceback:
sys.stderr.write(failure_traceback)
sys.exit(1)
if self._testonly:
@ -288,41 +299,3 @@ class Server:
# history_path = os.environ.get('ALTERNATIVE_HISTORY')
# if history_path:
# from frappy_<xx>.historywriter import ... etc.
def _getSortedModules(self):
"""Sort modules topologically by inverse dependency.
Example: if there is an IO device A and module B depends on it, then
the result will be [B, A].
Right now, if the dependency graph is not a DAG, we give up and return
the unvisited nodes to be dismantled at the end.
Taken from Introduction to Algorithms [CLRS].
"""
def go(name):
if name in done: # visiting a node
return True
if name in visited:
visited.add(name)
return False # cycle in dependencies -> fail
visited.add(name)
if name in unmarked:
unmarked.remove(name)
for module in self.modules[name].attachedModules.values():
res = go(module.name)
if not res:
return False
visited.remove(name)
done.add(name)
l.append(name)
return True
unmarked = set(self.modules.keys()) # unvisited nodes
visited = set() # visited in DFS, but not completed
done = set()
l = [] # list of sorted modules
while unmarked:
if not go(unmarked.pop()):
self.log.error('cyclical dependency between modules!')
return l[::-1] + list(visited) + list(unmarked)
return l[::-1]

View File

@ -1,164 +0,0 @@
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
"""convenience class to create a struct Parameter together with indivdual params
Usage:
class Controller(Drivable):
...
ctrlpars = StructParam('ctrlpars struct', [
('pid_p', 'p', Parameter('control parameter p', FloatRange())),
('pid_i', 'i', Parameter('control parameter i', FloatRange())),
('pid_d', 'd', Parameter('control parameter d', FloatRange())),
], readonly=False)
...
then implement either read_ctrlpars and write_ctrlpars or
read_pid_p, read_pid_i, read_pid_d, write_pid_p, write_pid_i and write_pid_d
the methods not implemented will be created automatically
"""
from frappy.core import Parameter, Property
from frappy.datatypes import BoolType, DataType, StructOf, ValueType
from frappy.errors import ProgrammingError
class StructParam(Parameter):
"""create a struct parameter together with individual parameters
in addition to normal Parameter arguments:
:param paramdict: dict <member name> of Parameter(...)
:param prefix_or_map: either a prefix for the parameter name to add to the member name
or a dict <member name> or <paramerter name>
"""
# use properties, as simple attributes are not considered on copy()
paramdict = Property('dict <parametername> of Parameter(...)', ValueType())
hasStructRW = Property('has a read_<struct param> or write_<struct param> method',
BoolType(), default=False)
insideRW = 0 # counter for avoiding multiple superfluous updates
def __init__(self, description=None, paramdict=None, prefix_or_map='', *, datatype=None, readonly=False, **kwds):
if isinstance(paramdict, DataType):
raise ProgrammingError('second argument must be a dict of Param')
if datatype is None and paramdict is not None: # omit the following on Parameter.copy()
if isinstance(prefix_or_map, str):
prefix_or_map = {m: prefix_or_map + m for m in paramdict}
for membername, param in paramdict.items():
param.name = prefix_or_map[membername]
datatype = StructOf(**{m: p.datatype for m, p in paramdict.items()})
kwds['influences'] = [p.name for p in paramdict.values()]
self.updateEnable = {}
super().__init__(description, datatype, paramdict=paramdict, readonly=readonly, **kwds)
def __set_name__(self, owner, name):
# names of access methods of structed param (e.g. ctrlpars)
struct_read_name = f'read_{name}' # e.g. 'read_ctrlpars'
struct_write_name = f'write_{name}' # e.h. 'write_ctrlpars'
self.hasStructRW = hasattr(owner, struct_read_name) or hasattr(owner, struct_write_name)
for membername, param in self.paramdict.items():
pname = param.name
changes = {
'readonly': self.readonly,
'influences': set(param.influences) | {name},
}
param.ownProperties.update(changes)
param.init(changes)
setattr(owner, pname, param)
param.__set_name__(owner, param.name)
if self.hasStructRW:
rname = f'read_{pname}'
if not hasattr(owner, rname):
def rfunc(self, membername=membername, struct_read_name=struct_read_name):
return getattr(self, struct_read_name)()[membername]
rfunc.poll = False # read_<struct param> is polled only
setattr(owner, rname, rfunc)
if not self.readonly:
wname = f'write_{pname}'
if not hasattr(owner, wname):
def wfunc(self, value, membername=membername,
name=name, rname=rname, struct_write_name=struct_write_name):
valuedict = dict(getattr(self, name))
valuedict[membername] = value
getattr(self, struct_write_name)(valuedict)
return getattr(self, rname)()
setattr(owner, wname, wfunc)
if not self.hasStructRW:
if not hasattr(owner, struct_read_name):
def struct_read_func(self, name=name, flist=tuple(
(m, f'read_{p.name}') for m, p in self.paramdict.items())):
pobj = self.parameters[name]
# disable updates generated from the callbacks of individual params
pobj.insideRW += 1 # guarded by self.accessLock
try:
return {m: getattr(self, f)() for m, f in flist}
finally:
pobj.insideRW -= 1
setattr(owner, struct_read_name, struct_read_func)
if not (self.readonly or hasattr(owner, struct_write_name)):
def struct_write_func(self, value, name=name, funclist=tuple(
(m, f'write_{p.name}') for m, p in self.paramdict.items())):
pobj = self.parameters[name]
pobj.insideRW += 1 # guarded by self.accessLock
try:
return {m: getattr(self, f)(value[m]) for m, f in funclist}
finally:
pobj.insideRW -= 1
setattr(owner, struct_write_name, struct_write_func)
super().__set_name__(owner, name)
def finish(self, modobj=None):
"""register callbacks for consistency"""
super().finish(modobj)
if modobj:
if self.hasStructRW:
def cb(value, modobj=modobj, structparam=self):
for membername, param in structparam.paramdict.items():
setattr(modobj, param.name, value[membername])
modobj.valueCallbacks[self.name].append(cb)
else:
for membername, param in self.paramdict.items():
def cb(value, modobj=modobj, structparam=self, membername=membername):
if not structparam.insideRW:
prev = dict(getattr(modobj, structparam.name))
prev[membername] = value
setattr(modobj, structparam.name, prev)
modobj.valueCallbacks[param.name].append(cb)

View File

@ -70,11 +70,12 @@ def get_version(abbrev=4):
if git_version != release_version:
write_release_version(git_version)
return git_version
if release_version:
elif release_version:
return release_version
raise ValueError('Cannot find a version number - make sure that '
'git is installed or a RELEASE-VERSION file is '
'present!')
else:
raise ValueError('Cannot find a version number - make sure that '
'git is installed or a RELEASE-VERSION file is '
'present!')
if __name__ == "__main__":

View File

@ -354,7 +354,7 @@ class Cryostat(CryoBase):
timestamp = t
self.read_value()
def shutdownModule(self):
def shutdown(self):
# should be called from server when the server is stopped
self._stopflag = True
if self._thread and self._thread.is_alive():

View File

@ -24,33 +24,10 @@
import random
from frappy.datatypes import FloatRange, StringType, ValueType, TupleOf, StructOf, ArrayOf
from frappy.modules import Communicator, Drivable, Parameter, Property, Readable, Module, Attached
from frappy.modules import Communicator, Drivable, Parameter, Property, Readable, Module
from frappy.params import Command
from frappy.dynamic import Pinata
from frappy.errors import RangeError
class Pin(Pinata):
def scanModules(self):
yield ('pin_a', {'cls': LN2, 'description':'hi'})
yield ('pin_b', {'cls': LN2, 'description':'hi'})
class RecPin(Pinata):
def scanModules(self):
yield ('rec_a', {'cls': RecPinInner, 'description':'hi'})
yield ('rec_b', {'cls': RecPinInner, 'description':'hi'})#, 'idx':'_2'})
class RecPinInner(Pinata):
idx = Property('', StringType(), default='')
def scanModules(self):
yield ('pin_pin_a' + self.idx, {'cls': Mapped, 'description':'recursive!', 'choices':['A', 'B']})
yield ('pin_pin_b' + self.idx, {'cls': Mapped, 'description':'recursive!', 'choices':['A', 'B']})
class WithAtt(Readable):
att = Attached()
def read_value(self):
return self.att.read_value()
class LN2(Readable):
"""Just a readable.

View File

@ -1,234 +0,0 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Georg Brandl <g.brandl@fz-juelich.de>
# Alexander Zaft <a.zaft@fz-juelich.de>
#
# *****************************************************************************
"""Adapter to the existing SEOP 3He spin filter system daemon."""
from os import path
# eventually he3control
from he3d import he3cell # pylint: disable=import-error
from frappy.core import Attached
from frappy.datatypes import ArrayOf, FloatRange, IntRange, StatusType, \
StringType, TupleOf
from frappy.errors import CommandRunningError
from frappy.modules import Command, Drivable, Module, Parameter, Property, \
Readable
from frappy.rwhandler import CommonReadHandler
integral = IntRange()
floating = FloatRange()
string = StringType()
# Configuration is kept in YAML files to stay compatible to the
# traditional 3He daemon, for now.
class Cell(Module):
""" Dummy module for creating He3Cell object in order for other modules to talk to the hardware.
Only deals with the config, and rotating the paramlog.
"""
config_directory = Property(
'Directory for the YAML config files', datatype=string)
def initModule(self):
super().initModule()
self.cell = he3cell.He3_cell(
path.join(self.config_directory, 'cell.yml'))
# Commands
@Command(result=string)
def raw_config_file(self):
"""return unparsed contents of yaml file"""
with open(self.cell._He3_cell__cfg_filename, 'r', encoding='utf-8') as f:
return str(f.read())
@Command(string, result=string)
def cfg_get(self, identifier):
"""Get a configuration value."""
return str(self.cell.cfg_get(identifier))
@Command((string, string), result=string)
def cfg_set(self, identifier, value):
"""Set a configuration value."""
try:
value = int(value)
except ValueError:
try:
value = float(value)
except ValueError:
pass
# The type is lost during transmission.
# Check type so the value to be set has the same type and
# is not eg. a string where an int would be needed in the config key.
oldty = type(self.cell.cfg_get(identifier))
if oldty is not type(value):
raise ValueError('Type of value to be set does not match the '
'value in the configuration!')
return str(self.cell.cfg_set(identifier, value))
@Command()
def nmr_paramlog_rotate(self):
"""resets fitting and switches to a new logfile"""
self.cell.nmr_paramlog_rotate()
class Afp(Readable):
"""Polarisation state of the SEOP waveplates"""
value = Parameter('Current polarisation state of the SEOP waveplates', IntRange(0, 1))
cell = Attached(Cell)
def read_value(self):
return self.cell.cell.afp_state_get()
# Commands
@Command(description='Flip polarization of SEOP waveplates')
def afp_flip(self):
self.cell.cell.afp_flip_do()
self.read_value()
class Nmr(Readable):
Status = Drivable.Status
status = Parameter(datatype=StatusType(Drivable.Status))
value = Parameter('Timestamp of last NMR', string)
cell = Attached(Cell)
def initModule(self):
super().initModule()
self.interval = 0
def read_value(self):
return str(self.cell.cell.nmr_timestamp_get())
def read_status(self):
cellstate = self.cell.cell.nmr_state_get()
if self.cell.cell.nmr_background_check():
status = self.Status.BUSY, 'running every %d seconds' % self.interval
else:
status = self.Status.IDLE, 'not running'
# TODO: what do we do here with None and -1?
# -> None basically indicates that the fit for the parameters did not converge
if cellstate is None:
return self.Status.IDLE, f'returned None, {status[1]}'
if cellstate in (0, 1):
return status[0], f'nmr cellstate {cellstate}, {status[1]}'
if cellstate == -1:
return self.Status.WARN, f'got error from cell, {status[1]}'
return self.Status.ERROR, 'Unrecognized cellstate!'
# Commands
@Command()
def nmr_do(self):
"""Triggers the NMR to run"""
self.cell.cell.nmr_do()
self.read_status()
@Command()
def bgstart(self):
"""Start background NMR"""
if self.isBusy():
raise CommandRunningError('backgroundNMR is already running')
interval = self.cell.cell.cfg_get('tasks/nmr/background/interval')
self.interval = interval
self.cell.cell.nmr_background_start(interval)
self.read_status()
@Command()
def bgstop(self):
"""Stop background NMR"""
self.cell.cell.nmr_background_stop()
self.read_status()
# Commands to get large datasets we do not want directly in the NICOS cache
@Command(result=TupleOf(ArrayOf(floating, maxlen=100000),
ArrayOf(floating, maxlen=100000)))
def get_processed_nmr(self):
"""Get data for processed signal."""
val= self.cell.cell.nmr_processed_get()
return (val['xval'], val['yval'])
@Command(result=TupleOf(ArrayOf(floating, maxlen=100000),
ArrayOf(floating, maxlen=100000)))
def get_raw_nmr(self):
"""Get raw signal data."""
val = self.cell.cell.nmr_raw_get()
return (val['xval'], val['yval'])
@Command(result=TupleOf(ArrayOf(floating, maxlen=100000),
ArrayOf(floating, maxlen=100000)))
def get_raw_spectrum(self):
"""Get the raw spectrum."""
val = self.cell.cell.nmr_raw_spectrum_get()
y = val['yval'][:len(val['xval'])]
return (val['xval'], y)
@Command(result=TupleOf(ArrayOf(floating, maxlen=100000),
ArrayOf(floating, maxlen=100000)))
def get_processed_spectrum(self):
"""Get the processed spectrum."""
val = self.cell.cell.nmr_processed_spectrum_get()
x = val['xval'][:len(val['yval'])]
return (x, val['yval'])
@Command(result=TupleOf(ArrayOf(string, maxlen=100),
ArrayOf(floating, maxlen=100)))
def get_amplitude(self):
"""Last 20 amplitude datapoints."""
rv = self.cell.cell.nmr_paramlog_get('amplitude', 20)
x = [ str(timestamp) for timestamp in rv['xval']]
return (x,rv['yval'])
@Command(result=TupleOf(ArrayOf(string, maxlen=100),
ArrayOf(floating, maxlen=100)))
def get_phase(self):
"""Last 20 phase datapoints."""
val = self.cell.cell.nmr_paramlog_get('phase', 20)
return ([str(timestamp) for timestamp in val['xval']], val['yval'])
class FitParam(Readable):
value = Parameter('fitted value', unit='$', default=0.0)
sigma = Parameter('variance of the fitted value', FloatRange(), default=0.0)
param = Property('the parameter that should be accesssed',
StringType(), export=False)
cell = Attached(Cell)
@CommonReadHandler(['value', 'sigma'])
def read_amplitude(self):
ret = self.cell.cell.nmr_param_get(self.param)
self.value = ret['value']
self.sigma = ret['sigma']
# Commands
@Command(integral, result=TupleOf(ArrayOf(string),
ArrayOf(floating)))
def nmr_paramlog_get(self, n):
"""returns the log of the last 'n' values for this parameter"""
val = self.cell.cell.nmr_paramlog_get(self.param, n)
return ([str(timestamp) for timestamp in val['xval']], val['yval'])

View File

@ -1,3 +1,5 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
@ -15,13 +17,64 @@
#
# Module authors:
# Oksana Shliakhtun <oksana.shliakhtun@psi.ch>
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""bath thermostat Thermo Scientific™ ARCTIC A10 Refrigerated Circulators"""
""" RUFS Command: Description of Bits
from frappy.core import Command, StringIO, Parameter, HasIO, \
====== ======================================================== ==============================================
Value Description
====== ======================================================== ==============================================
V1
B6: warning, rtd1 (internal temp. sensor) is shorted
B0 --> 1
B7: warning, rtd1 is open
B1 --> 2
V2
B0: error, HTC (high temperature cutout) fault B2 --> 4
B1: error, high RA (refrigeration) temperature fault B3 --> 8
V3 B4 --> 16
B0: warning, low level in the bath
B5 --> 32
B1: warning, low temperature
B6 --> 64
B2: warning, high temperature
B7 --> 128
B3: error, low level in the bath
B4: error, low temperature fault
B5: error, high temperature fault
B6: error, low temperature fixed* fault
B7: error, high temperature fixed** fault
V4
B3: idle, circulator** is running
B5: error, circulator** fault
V5
B0: error, pump speed fault
B1: error, motor overloaded
B2: error, high pressure cutout
B3: idle, maximum cooling
B4: idle, cooling
B5: idle, maximum heating
B6: idle, heating
====== ======================================================== ==============================================
"""
from frappy.core import StringIO, Parameter, HasIO, \
Drivable, FloatRange, IDLE, BUSY, ERROR, WARN, BoolType
from frappy.structparam import StructParam
from frappy_psi.convergence import HasConvergence
@ -32,17 +85,17 @@ class ThermFishIO(StringIO):
class TemperatureLoopA10(HasConvergence, HasIO, Drivable):
ioClass = ThermFishIO
value = Parameter('internal temperature', unit='degC')
value = Parameter('temperature', unit='degC')
target = Parameter('setpoint/target', datatype=FloatRange, unit='degC', default=0)
control_active = Parameter('circilation and control is on', BoolType(), default=False)
ctrlpars = StructParam('control parameters struct', dict(
p_heat = Parameter('proportional heat parameter', FloatRange()),
i_heat = Parameter('integral heat parameter', FloatRange()),
d_heat = Parameter('derivative heat parameter', FloatRange()),
p_cool = Parameter('proportional cool parameter', FloatRange()),
i_cool = Parameter('integral cool parameter', FloatRange()),
d_cool = Parameter('derivative cool parameter', FloatRange()),
), readonly=False)
circ_on = Parameter('is circulation running', BoolType(), readonly=False, default=False)
# pids
p_heat = Parameter('proportional heat parameter', FloatRange(), readonly=False)
i_heat = Parameter('integral heat parameter', FloatRange(), readonly=False)
d_heat = Parameter('derivative heat parameter', FloatRange(), readonly=False)
p_cool = Parameter('proportional cool parameter', FloatRange(), readonly=False)
i_cool = Parameter('integral cool parameter', FloatRange(), readonly=False)
d_cool = Parameter('derivative cool parameter', FloatRange(), readonly=False)
status_messages = [
(ERROR, 'high tempr. cutout fault', 2, 0),
@ -69,22 +122,20 @@ class TemperatureLoopA10(HasConvergence, HasIO, Drivable):
]
def get_par(self, cmd):
"""get parameter and convert to float
"""
All the reading commands starts with 'R', in the source code all the commands are written without 'R' (except
'RUFS').The result of a reading command is a value in the format '20C', without spaces.
:param cmd: hardware command without the leading 'R'
:param cmd: any hardware command
:return: result converted to float
:return: 'R'+cmd
"""
new_cmd = 'R' + cmd
reply = self.communicate(new_cmd).strip()
while reply[-1].isalpha():
reply = reply[:-1]
reply = self.communicate(new_cmd)
if any(unit.isalpha() for unit in reply):
reply = ''.join(unit for unit in reply if not unit.isalpha())
return float(reply)
def set_par(self, cmd, value):
self.communicate(f'S{cmd} {value}')
return self.get_par(cmd)
def read_value(self):
"""
Reading internal temperature sensor value.
@ -92,34 +143,6 @@ class TemperatureLoopA10(HasConvergence, HasIO, Drivable):
return self.get_par('T')
def read_status(self):
""" convert from RUFS Command: Description of Bits
====== ======================================================== ===============
Value Description
====== ======================================================== ===============
V1 B6: warning, rtd1 (internal temp. sensor) is shorted B0 --> 1
B7: warning, rtd1 is open B1 --> 2
V2 B0: error, HTC (high temperature cutout) fault B2 --> 4
B1: error, high RA (refrigeration) temperature fault B3 --> 8
V3 B0: warning, low level in the bath B5 --> 32
B1: warning, low temperature B6 --> 64
B2: warning, high temperature B7 --> 128
B3: error, low level in the bath
B4: error, low temperature fault
B5: error, high temperature fault
B6: error, low temperature fixed* fault
B7: error, high temperature fixed** fault
V4 B3: idle, circulator** is running
B5: error, circulator** fault
V5 B0: error, pump speed fault
B1: error, motor overloaded
B2: error, high pressure cutout
B3: idle, maximum cooling
B4: idle, cooling
B5: idle, maximum heating
B6: idle, heating
====== ======================================================== ===============
"""
result_str = self.communicate('RUFS') # read unit fault status
values_str = result_str.strip().split()
values_int = [int(val) for val in values_str]
@ -134,55 +157,72 @@ class TemperatureLoopA10(HasConvergence, HasIO, Drivable):
return status_type, status_msg
return WARN, 'circulation off'
def read_control_active(self):
return int(self.get_par('O'))
def read_circ_on(self):
return self.communicate('RO')
@Command
def control_off(self):
"""switch control and circulation off"""
self.control_active = self.set_par('O', 0)
def write_circ_on(self, circ_on):
circ_on_str = '1' if circ_on else '0'
self.communicate(f'SO {circ_on_str}')
return self.read_circ_on()
def read_target(self):
return self.get_par('S')
def write_target(self, target):
self.control_active = self.set_par('O', 1)
"""
:param target: here, it serves as an equivalent to a setpoint.
"""
self.write_circ_on('1')
self.communicate(f'SS {target}')
self.convergence_start()
return target
## heat PID
def read_p_heat(self):
return self.get_par('PH')
p_heat = self.get_par('PH')
return float(p_heat)
def write_p_heat(self, value):
return self.set_par('PH', value)
def write_p_heat(self, p_heat):
self.communicate(f'SPH {p_heat}')
return p_heat
def read_i_heat(self):
return self.get_par('IH')
i_heat = self.get_par('IH')
return float(i_heat)
def write_i_heat(self, value):
return self.set_par('IH', value)
def write_i_heat(self, i_heat):
self.communicate(f'SIH {i_heat}')
return i_heat
def read_d_heat(self):
return self.get_par('DH')
d_heat = self.get_par('DH')
return float(d_heat)
def write_d_heat(self, value):
return self.set_par('DH', value)
def write_d_heat(self, d_heat):
self.communicate(f'SDH {d_heat}')
return d_heat
## cool PID
def read_p_cool(self):
return self.get_par('PC')
p_cool = self.get_par('PC')
return float(p_cool)
def write_p_cool(self, value):
return self.set_par('PC', value)
def write_p_cool(self, p_cool):
self.communicate(f'SPC {p_cool}')
return p_cool
def read_i_cool(self):
return self.get_par('IC')
i_cool = self.get_par('IC')
return float(i_cool)
def write_i_cool(self, value):
return self.set_par('IC', value)
def write_i_cool(self, i_cool):
self.communicate(f'SIC {i_cool}')
return i_cool
def read_d_cool(self):
return self.get_par('DC')
d_cool = self.get_par('DC')
return float(d_cool)
def write_d_cool(self, value):
return self.set_par('DC', value)
def write_d_cool(self, d_cool):
self.communicate(f'SDC {d_cool}')
return d_cool

View File

@ -75,4 +75,6 @@ def test_attach():
assert m.propertyValues['att'] == 'a'
srv.dispatcher.register_module(a, 'a')
srv.dispatcher.register_module(m, 'm')
assert m.att == 'a'
m.attachedModules = {'att': a}
assert m.att == a

View File

@ -1,141 +0,0 @@
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Alexander Zaft <a.zaft@fz-juelich.de>
#
# *****************************************************************************
# false positive with fixtures
# pylint: disable=redefined-outer-name
import pytest
from frappy.config import Collector, Config, Mod, NodeCollector, load_config, \
process_file, to_config_path
from frappy.errors import ConfigError
from frappy.lib import generalConfig
class LoggerStub:
def debug(self, fmt, *args):
pass
info = warning = exception = error = debug
handlers = []
@pytest.fixture
def log():
return LoggerStub()
PY_FILE = """Node('foonode', 'fodesc', 'fooface')
Mod('foo', 'frappy.modules.Readable', 'description', value=5)
Mod('bar', 'frappy.modules.Readable', 'about me', export=False)
Mod('baz', 'frappy.modules.Readable', 'things', value=Param(3, unit='BAR'))
"""
# fixture file system, TODO: make a bit nicer?
@pytest.fixture
def direc(tmp_path_factory):
d = tmp_path_factory.mktemp('cfgdir')
a = d / 'a'
b = d / 'b'
a.mkdir()
b.mkdir()
f = a / 'config_cfg.py'
pyfile = a / 'pyfile_cfg.py'
ff = b / 'test_cfg.py'
fff = b / 'alsoworks.py'
f.touch()
ff.touch()
fff.touch()
pyfile.write_text(PY_FILE)
generalConfig.testinit(confdir=f'{a}:{b}', piddir=str(d))
return d
files = [('config', 'a/config_cfg.py'),
('config_cfg', 'a/config_cfg.py'),
('config_cfg.py', 'a/config_cfg.py'),
('test', 'b/test_cfg.py'),
('test_cfg', 'b/test_cfg.py'),
('test_cfg.py', 'b/test_cfg.py'),
('alsoworks', 'b/alsoworks.py'),
('alsoworks.py', 'b/alsoworks.py'),
]
@pytest.mark.parametrize('file, res', files)
def test_to_cfg_path(log, direc, file, res):
assert to_config_path(file, log).endswith(res)
def test_cfg_not_existing(direc, log):
with pytest.raises(ConfigError):
to_config_path('idonotexist', log)
def collector_helper(node, mods):
n = NodeCollector()
n.add(*node)
m = Collector(Mod)
m.list = [Mod(module, '', '') for module in mods]
return n, m
configs = [
(['n1', 'desc', 'iface'], ['foo', 'bar', 'baz'], ['n2', 'foo', 'bar'],
['foo', 'more', 'other'], ['n1', 'iface', 5, {'foo'}]),
(['n1', 'desc', 'iface'], ['foo', 'bar', 'baz'], ['n2', 'foo', 'bar'],
['different', 'more', 'other'], ['n1', 'iface', 6, set()]),
]
@pytest.mark.parametrize('n1, m1, n2, m2, res', configs)
def test_merge(n1, m1, n2, m2, res):
name, iface, num_mods, ambig = res
c1 = Config(*collector_helper(n1, m1))
c2 = Config(*collector_helper(n2, m2))
c1.merge_modules(c2)
assert c1['node']['equipment_id'] == name
assert c1['node']['interface'] == iface
assert len(c1.module_names) == num_mods
assert c1.ambiguous == ambig
def do_asserts(ret):
assert len(ret.module_names) == 3
assert set(ret.module_names) == set(['foo', 'bar', 'baz'])
assert ret['node']['equipment_id'] == 'foonode'
assert ret['node']['interface'] == 'fooface'
assert ret['foo'] == {'cls': 'frappy.modules.Readable',
'description': 'description', 'value': {'value': 5}}
assert ret['bar'] == {'cls': 'frappy.modules.Readable',
'description': 'about me', 'export': {'value': False}}
assert ret['baz'] == {'cls': 'frappy.modules.Readable',
'description': 'things',
'value': {'value': 3, 'unit': 'BAR'}}
def test_process_file(direc, log):
ret = process_file(str(direc / 'a' / 'pyfile_cfg.py'), log)
do_asserts(ret)
def test_full(direc, log):
ret = load_config('pyfile_cfg.py', log)
do_asserts(ret)

View File

@ -1,132 +0,0 @@
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
"""test frappy.mixins.HasCtrlPars"""
from test.test_modules import LoggerStub, ServerStub
from frappy.core import FloatRange, Module, Parameter
from frappy.structparam import StructParam
def test_with_read_ctrlpars():
class Mod(Module):
ctrlpars = StructParam('ctrlpar struct', dict(
p = Parameter('control parameter p', FloatRange()),
i = Parameter('control parameter i', FloatRange()),
d = Parameter('control parameter d', FloatRange()),
), 'pid_', readonly=False)
def read_ctrlpars(self):
return self._ctrlpars
def write_ctrlpars(self, value):
self._ctrlpars = value
return self.read_ctrlpars()
logger = LoggerStub()
updates = {}
srv = ServerStub(updates)
ms = Mod('ms', logger, {'description':''}, srv)
value = {'p': 1, 'i': 2, 'd': 3}
assert ms.write_ctrlpars(value) == value
assert ms.read_ctrlpars() == value
assert ms.read_pid_p() == 1
assert ms.read_pid_i() == 2
assert ms.read_pid_d() == 3
assert ms.write_pid_i(5) == 5
assert ms.write_pid_d(0) == 0
assert ms.read_ctrlpars() == {'p': 1, 'i': 5, 'd': 0}
assert set(Mod.ctrlpars.influences) == {'pid_p', 'pid_i', 'pid_d'}
assert Mod.pid_p.influences == ('ctrlpars',)
assert Mod.pid_i.influences == ('ctrlpars',)
assert Mod.pid_d.influences == ('ctrlpars',)
def test_without_read_ctrlpars():
class Mod(Module):
ctrlpars = StructParam('ctrlpar struct', dict(
p = Parameter('control parameter p', FloatRange()),
i = Parameter('control parameter i', FloatRange()),
d = Parameter('control parameter d', FloatRange()),
), readonly=False)
_pid_p = 0
_pid_i = 0
def read_p(self):
return self._pid_p
def write_p(self, value):
self._pid_p = value
return self.read_p()
def read_i(self):
return self._pid_i
def write_i(self, value):
self._pid_i = value
return self.read_i()
logger = LoggerStub()
updates = {}
srv = ServerStub(updates)
ms = Mod('ms', logger, {'description': ''}, srv)
value = {'p': 1, 'i': 2, 'd': 3}
assert ms.write_ctrlpars(value) == value
assert ms.read_ctrlpars() == value
assert ms.read_p() == 1
assert ms.read_i() == 2
assert ms.read_d() == 3
assert ms.write_i(5) == 5
assert ms.write_d(0) == 0
assert ms.read_ctrlpars() == {'p': 1, 'i': 5, 'd': 0}
assert set(Mod.ctrlpars.influences) == {'p', 'i', 'd'}
assert Mod.p.influences == ('ctrlpars',)
assert Mod.i.influences == ('ctrlpars',)
assert Mod.d.influences == ('ctrlpars',)
def test_readonly():
class Mod(Module):
ctrlpars = StructParam('ctrlpar struct', dict(
p = Parameter('control parameter p', FloatRange()),
i = Parameter('control parameter i', FloatRange()),
d = Parameter('control parameter d', FloatRange()),
), {'p': 'pp', 'i':'ii', 'd': 'dd'}, readonly=True)
assert Mod.ctrlpars.readonly is True
assert Mod.pp.readonly is True
assert Mod.ii.readonly is True
assert Mod.dd.readonly is True
def test_order_dependence1():
test_without_read_ctrlpars()
test_with_read_ctrlpars()
def test_order_dependence2():
test_with_read_ctrlpars()
test_without_read_ctrlpars()

View File

@ -22,7 +22,7 @@
import pytest
from frappy.lib import parse_host_port, merge_status
from frappy.lib import parse_host_port
@pytest.mark.parametrize('hostport, defaultport, result', [
@ -46,19 +46,3 @@ def test_parse_host(hostport, defaultport, result):
parse_host_port(hostport, defaultport)
else:
assert result == parse_host_port(hostport, defaultport)
@pytest.mark.parametrize('args, result', [
([(100, 'idle'), (200, 'warning')],
(200, 'warning')),
([(300, 'ramping'), (300, 'within tolerance')],
(300, 'ramping, within tolerance')),
([(300, 'ramping, within tolerance'), (300, 'within tolerance, slow'), (200, 'warning')],
(300, 'ramping, within tolerance, slow')),
# when a comma is used for other purposes than separating individual status texts,
# the behaviour might not be as desired. However, this case is somewhat constructed.
([(100, 'blue, yellow is my favorite'), (100, 'white, blue, red is a bad color mix')],
(100, 'blue, yellow is my favorite, white, red is a bad color mix')),
])
def test_merge_status(args, result):
assert merge_status(*args) == result

View File

@ -1,55 +0,0 @@
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Alexander Zaft <a.zaft@fz-juelich.de>
#
# *****************************************************************************
import pytest
# pylint: disable=redefined-outer-name
from frappy.server import Server
from .test_config import direc # pylint: disable=unused-import
class LoggerStub:
def debug(self, fmt, *args):
pass
def getChild(self, *args):
return self
info = warning = exception = error = debug
handlers = []
@pytest.fixture
def log():
return LoggerStub()
def test_name_only(direc, log):
"""only see that this does not throw. get config from name."""
s = Server('pyfile', log)
s._processCfg()
def test_file(direc, log):
"""only see that this does not throw. get config from cfgfiles."""
s = Server('foo', log, cfgfiles='pyfile_cfg.py')
s._processCfg()