reworking messages

1) start 'bin/secop-server test'
2) connect to localhost port 10767
3) enter help<enter>
4) enjoy

Change-Id: I488d5f9cdca8c91c583691ab23f541a4a8759f4e
This commit is contained in:
Enrico Faulhaber
2016-09-29 18:27:33 +02:00
parent dc2d0a10aa
commit b6af55c358
49 changed files with 3682 additions and 1034 deletions

View File

312
secop/devices/core.py Normal file
View File

@@ -0,0 +1,312 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
#
# *****************************************************************************
"""Define Baseclasses for real devices implemented in the server"""
# XXX: connect with 'protocol'-Devices.
# Idea: every Device defined herein is also a 'protocol'-device,
# all others MUST derive from those, the 'interface'-class is still derived
# from these base classes (how to do this?)
import time
import types
import inspect
import threading
from secop.errors import ConfigError, ProgrammingError
from secop.protocol import status
from secop.validators import enum, vector, floatrange
EVENT_ONLY_ON_CHANGED_VALUES = False
# storage for PARAMeter settings:
# if readonly is False, the value can be changed (by code, or remote)
# if no default is given, the parameter MUST be specified in the configfile
# during startup, value is initialized with the default value or
# from the config file if specified there
class PARAM(object):
def __init__(self, description, validator=float, default=Ellipsis,
unit=None, readonly=True, export=True):
if isinstance(description, PARAM):
# make a copy of a PARAM object
self.__dict__.update(description.__dict__)
return
self.description = description
self.validator = validator
self.default = default
self.unit = unit
self.readonly = readonly
self.export = export
# internal caching: value and timestamp of last change...
self.value = default
self.timestamp = 0
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, ', '.join(
['%s=%r' % (k, v) for k, v in sorted(self.__dict__.items())]))
# storage for CMDs settings (description + call signature...)
class CMD(object):
def __init__(self, description, arguments, result):
# descriptive text for humans
self.description = description
# list of validators for arguments
self.arguments = arguments
# validator for result
self.resulttype = result
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, ', '.join(
['%s=%r' % (k, v) for k, v in sorted(self.__dict__.items())]))
# Meta class
# warning: MAGIC!
class DeviceMeta(type):
def __new__(mcs, name, bases, attrs):
newtype = type.__new__(mcs, name, bases, attrs)
if '__constructed__' in attrs:
return newtype
# merge PARAM and CMDS from all sub-classes
for entry in ['PARAMS', 'CMDS']:
newentry = {}
for base in reversed(bases):
if hasattr(base, entry):
newentry.update(getattr(base, entry))
newentry.update(attrs.get(entry, {}))
setattr(newtype, entry, newentry)
# check validity of PARAM entries
for pname, pobj in newtype.PARAMS.items():
# XXX: allow dicts for overriding certain aspects only.
if not isinstance(pobj, PARAM):
raise ProgrammingError('%r: device PARAM %r should be a '
'PARAM object!' % (name, pname))
# XXX: create getters for the units of params ??
# wrap of reading/writing funcs
rfunc = attrs.get('read_' + pname, None)
def wrapped_rfunc(self, maxage=0, pname=pname, rfunc=rfunc):
if rfunc:
value = rfunc(self, maxage)
setattr(self, pname, value)
return value
else:
# return cached value
return self.PARAMS[pname].value
if rfunc:
wrapped_rfunc.__doc__ = rfunc.__doc__
setattr(newtype, 'read_' + pname, wrapped_rfunc)
if not pobj.readonly:
wfunc = attrs.get('write_' + pname, None)
def wrapped_wfunc(self, value, pname=pname, wfunc=wfunc):
self.log.debug("wfunc: set %s to %r" % (pname, value))
pobj = self.PARAMS[pname]
value = pobj.validator(value) if pobj.validator else value
if wfunc:
value = wfunc(self, value) or value
# XXX: use setattr or direct manipulation
# of self.PARAMS[pname]?
setattr(self, pname, value)
return value
if wfunc:
wrapped_wfunc.__doc__ = wfunc.__doc__
setattr(newtype, 'write_' + pname, wrapped_wfunc)
def getter(self, pname=pname):
return self.PARAMS[pname].value
def setter(self, value, pname=pname):
pobj = self.PARAMS[pname]
value = pobj.validator(value) if pobj.validator else value
pobj.timestamp = time.time()
if not EVENT_ONLY_ON_CHANGED_VALUES or (value != pobj.value):
pobj.value = value
# also send notification
self.log.debug('%s is now %r' % (pname, value))
self.DISPATCHER.announce_update(self, pname, pobj)
setattr(newtype, pname, property(getter, setter))
# also collect/update information about CMD's
setattr(newtype, 'CMDS', getattr(newtype, 'CMDS', {}))
for name in attrs:
if name.startswith('do'):
if name[2:] in newtype.CMDS:
continue
value = getattr(newtype, name)
if isinstance(value, types.MethodType):
argspec = inspect.getargspec(value)
if argspec[0] and argspec[0][0] == 'self':
del argspec[0][0]
newtype.CMDS[name[2:]] = CMD(getattr(value, '__doc__'),
argspec.args, None) # XXX: find resulttype!
attrs['__constructed__'] = True
return newtype
# Basic device class
#
# within devices, parameters should only be addressed as self.<pname>
# i.e. self.value, self.target etc...
# these are accesses to the cached version.
# they can also be written to
# (which auto-calls self.write_<pname> and generate an async update)
# if you want to 'update from the hardware', call self.read_<pname>
# the return value of this method will be used as the new cached value and
# be returned.
class Device(object):
"""Basic Device, doesn't do much"""
__metaclass__ = DeviceMeta
# PARAMS and CMDS are auto-merged upon subclassing
PARAMS = {
'baseclass': PARAM('protocol defined interface class',
default="Device", validator=str),
}
CMDS = {}
DISPATCHER = None
def __init__(self, logger, cfgdict, devname, dispatcher):
# remember the dispatcher object (for the async callbacks)
self.DISPATCHER = dispatcher
self.log = logger
self.name = devname
# make local copies of PARAMS
params = {}
for k, v in self.PARAMS.items():
params[k] = PARAM(v)
mycls = self.__class__
myclassname = '%s.%s' % (mycls.__module__, mycls.__name__)
params['class'] = PARAM('implementation specific class name',
default=myclassname, validator=str)
self.PARAMS = params
# check config for problems
# only accept config items specified in PARAMS
for k, v in cfgdict.items():
if k not in self.PARAMS:
raise ConfigError('Device %s:config Parameter %r '
'not unterstood!' % (self.name, k))
# complain if a PARAM entry has no default value and
# is not specified in cfgdict
for k, v in self.PARAMS.items():
if k not in cfgdict:
if v.default is Ellipsis and k != 'value':
# Ellipsis is the one single value you can not specify....
raise ConfigError('Device %s: Parameter %r has no default '
'value and was not given in config!'
% (self.name, k))
# assume default value was given
cfgdict[k] = v.default
# replace CLASS level PARAM objects with INSTANCE level ones
self.PARAMS[k] = PARAM(self.PARAMS[k])
# now 'apply' config:
# pass values through the validators and store as attributes
for k, v in cfgdict.items():
# apply validator, complain if type does not fit
validator = self.PARAMS[k].validator
if validator is not None:
# only check if validator given
try:
v = validator(v)
except ValueError as e:
raise ConfigError('Device %s: config parameter %r:\n%r'
% (self.name, k, e))
setattr(self, k, v)
self._requestLock = threading.RLock()
def init(self):
# may be overriden in derived classes to init stuff
self.log.debug('init()')
def _pollThread(self):
# may be overriden in derived classes to init stuff
self.log.debug('init()')
class Readable(Device):
"""Basic readable device
providing the readonly parameter 'value' and 'status'
"""
PARAMS = {
'baseclass': PARAM('protocol defined interface class',
default="Readable", validator=str),
'value': PARAM('current value of the device', readonly=True, default=0.),
'pollinterval': PARAM('sleeptime between polls', readonly=False, default=5, validator=floatrange(1,120),),
'status': PARAM('current status of the device', default=status.OK,
validator=enum(**{'idle': status.OK,
'BUSY': status.BUSY,
'WARN': status.WARN,
'UNSTABLE': status.UNSTABLE,
'ERROR': status.ERROR,
'UNKNOWN': status.UNKNOWN}),
readonly=True),
'status2': PARAM('current status of the device', default=(status.OK, ''),
validator=vector(enum(**{'idle': status.OK,
'BUSY': status.BUSY,
'WARN': status.WARN,
'UNSTABLE': status.UNSTABLE,
'ERROR': status.ERROR,
'UNKNOWN': status.UNKNOWN}), str),
readonly=True),
}
def init(self):
Device.init(self)
self._pollthread = threading.Thread(target=self._pollThread)
self._pollthread.daemon = True
self._pollthread.start()
def _pollThread(self):
while True:
time.sleep(self.pollinterval)
for pname in self.PARAMS:
if pname != 'pollinterval':
rfunc = getattr(self, 'read_%s' % pname, None)
if rfunc:
rfunc()
class Driveable(Readable):
"""Basic Driveable device
providing a settable 'target' parameter to those of a Readable
"""
PARAMS = {
'baseclass': PARAM('protocol defined interface class',
default="Driveable", validator=str),
'target': PARAM('target value of the device', default=0.,
readonly=False),
}
def doStop(self):
time.sleep(1) # for testing !

327
secop/devices/cryo.py Normal file
View File

@@ -0,0 +1,327 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
# *****************************************************************************
"""playing implementation of a (simple) simulated cryostat"""
from math import atan
import time
import random
import threading
from secop.devices.core import Driveable, CONFIG, PARAM
from secop.protocol import status
from secop.validators import floatrange, positive, enum
from secop.lib import clamp
class Cryostat(Driveable):
"""simulated cryostat with:
- heat capacity of the sample
- cooling power
- thermal transfer between regulation and samplen
"""
PARAMS = dict(
jitter=CONFIG("amount of random noise on readout values",
validator=floatrange(0, 1),
export=False,
),
T_start=CONFIG("starting temperature for simulation",
validator=positive, export=False,
),
looptime=CONFIG("timestep for simulation",
validator=positive, default=1, unit="s",
export=False,
),
ramp=PARAM("ramping speed in K/min",
validator=floatrange(0, 1e3), default=1,
),
setpoint=PARAM("ramping speed in K/min",
validator=float, default=1, readonly=True,
),
maxpower=PARAM("Maximum heater power in W",
validator=float, default=0, readonly=True, unit="W",
),
heater=PARAM("current heater setting in %",
validator=float, default=0, readonly=True, unit="%",
),
heaterpower=PARAM("current heater power in W",
validator=float, default=0, readonly=True, unit="W",
),
target=PARAM("target temperature in K",
validator=float, default=0, unit="K",
),
p=PARAM("regulation coefficient 'p' in %/K",
validator=positive, default=40, unit="%/K",
),
i=PARAM("regulation coefficient 'i'",
validator=floatrange(0, 100), default=10,
),
d=PARAM("regulation coefficient 'd'",
validator=floatrange(0, 100), default=2,
),
mode=PARAM("mode of regulation",
validator=enum('ramp', 'pid', 'openloop'), default='pid',
),
tolerance=PARAM("temperature range for stability checking",
validator=floatrange(0, 100), default=0.1, unit='K',
),
window=PARAM("time window for stability checking",
validator=floatrange(1, 900), default=30, unit='s',
),
timeout=PARAM("max waiting time for stabilisation check",
validator=floatrange(1, 36000), default=900, unit='s',
),
)
def init(self):
self._stopflag = False
self._thread = threading.Thread(target=self.thread)
self._thread.daemon = True
self._thread.start()
def read_status(self):
# instead of asking a 'Hardware' take the value from the simulation
return self.status
def read_value(self, maxage=0):
# return regulation value (averaged regulation temp)
return self.regulationtemp + \
self.config_jitter * (0.5 - random.random())
def read_target(self, maxage=0):
return self.target
def write_target(self, value):
self.target = value
# next request will see this status, until the loop updates it
self.status = (status.BUSY, 'new target set')
def read_maxpower(self, maxage=0):
return self.maxpower
def write_maxpower(self, newpower):
# rescale heater setting in % to keep the power
heat = max(0, min(100, self.heater * self.maxpower / float(newpower)))
self.heater = heat
self.maxpower = newpower
def doStop(self):
# stop the ramp by setting current setpoint as target
# XXX: discussion: take setpoint or current value ???
self.write_target(self.setpoint)
#
# calculation helpers
#
def __coolerPower(self, temp):
"""returns cooling power in W at given temperature"""
# quadratic up to 42K, is linear from 40W@42K to 100W@600K
# return clamp((temp-2)**2 / 32., 0., 40.) + temp * 0.1
return clamp(15 * atan(temp * 0.01) ** 3, 0., 40.) + temp * 0.1 - 0.2
def __coolerCP(self, temp):
"""heat capacity of cooler at given temp"""
return 75 * atan(temp / 50)**2 + 1
def __heatLink(self, coolertemp, sampletemp):
"""heatflow from sample to cooler. may be negative..."""
flow = (sampletemp - coolertemp) * \
((coolertemp + sampletemp) ** 2) / 400.
cp = clamp(self.__coolerCP(coolertemp) * self.__sampleCP(sampletemp),
1, 10)
return clamp(flow, -cp, cp)
def __sampleCP(self, temp):
return 3 * atan(temp / 30) + \
12 * temp / ((temp - 12.)**2 + 10) + 0.5
def __sampleLeak(self, temp):
return 0.02 / temp
def thread(self):
self.sampletemp = self.config_T_start
self.regulationtemp = self.config_T_start
self.status = status.OK
while not self._stopflag:
try:
self.__sim()
except Exception as e:
self.log.exception(e)
self.status = status.ERROR, str(e)
def __sim(self):
# complex thread handling:
# a) simulation of cryo (heat flow, thermal masses,....)
# b) optional PID temperature controller with windup control
# c) generating status+updated value+ramp
# this thread is not supposed to exit!
# local state keeping:
regulation = self.regulationtemp
sample = self.sampletemp
# keep history values for stability check
window = []
timestamp = time.time()
heater = 0
lastflow = 0
last_heaters = (0, 0)
delta = 0
I = D = 0
lastD = 0
damper = 1
lastmode = self.mode
while not self._stopflag:
t = time.time()
h = t - timestamp
if h < self.looptime / damper:
time.sleep(clamp(self.looptime / damper - h, 0.1, 60))
continue
# a)
sample = self.sampletemp
regulation = self.regulationtemp
heater = self.heater
heatflow = self.__heatLink(regulation, sample)
self.log.debug('sample = %.5f, regulation = %.5f, heatflow = %.5g'
% (sample, regulation, heatflow))
newsample = max(0,
sample + (self.__sampleLeak(sample) - heatflow) /
self.__sampleCP(sample) * h)
# avoid instabilities due to too small CP
newsample = clamp(newsample, sample, regulation)
regdelta = (heater * 0.01 * self.maxpower + heatflow -
self.__coolerPower(regulation))
newregulation = max(0, regulation +
regdelta / self.__coolerCP(regulation) * h)
# b) see
# http://brettbeauregard.com/blog/2011/04/
# improving-the-beginners-pid-introduction/
if self.mode != 'openloop':
# fix artefacts due to too big timesteps
# actually i would prefer reducing looptime, but i have no
# good idea on when to increase it back again
if heatflow * lastflow != -100:
if (newregulation - newsample) * (regulation - sample) < 0:
# newregulation = (newregulation + regulation) / 2
# newsample = (newsample + sample) / 2
damper += 1
lastflow = heatflow
error = self.setpoint - newregulation
# use a simple filter to smooth delta a little
delta = (delta + regulation - newregulation) / 2.
kp = self.p / 10. # LakeShore P = 10*k_p
ki = kp * abs(self.i) / 500. # LakeShore I = 500/T_i
kd = kp * abs(self.d) / 2. # LakeShore D = 2*T_d
P = kp * error
I += ki * error * h
D = kd * delta / h
# avoid reset windup
I = clamp(I, 0., 100.) # I is in %
# avoid jumping heaterpower if switching back to pid mode
if lastmode != self.mode:
# adjust some values upon switching back on
I = self.heater - P - D
v = P + I + D
# in damping mode, use a weighted sum of old + new heaterpower
if damper > 1:
v = ((damper ** 2 - 1) * self.heater + v) / damper ** 2
# damp oscillations due to D switching signs
if D * lastD < -0.2:
v = (v + heater) / 2.
# clamp new heater power to 0..100%
heater = clamp(v, 0., 100.)
lastD = D
self.log.debug('PID: P = %.2f, I = %.2f, D = %.2f, '
'heater = %.2f' % (P, I, D, heater))
# check for turn-around points to detect oscillations ->
# increase damper
x, y = last_heaters
if (x + 0.1 < y and y > heater + 0.1) or \
(x > y + 0.1 and y + 0.1 < heater):
damper += 1
last_heaters = (y, heater)
else:
# self.heaterpower is set manually, not by pid
heater = self.heater
last_heaters = (0, 0)
heater = round(heater, 3)
sample = newsample
regulation = newregulation
lastmode = self.mode
# c)
if self.setpoint != self.target:
if self.ramp == 0:
maxdelta = 10000
else:
maxdelta = self.ramp / 60. * h
try:
self.setpoint = round(self.setpoint +
clamp(self.target - self.setpoint,
-maxdelta, maxdelta), 3)
self.log.debug('setpoint changes to %r (target %r)' %
(self.setpoint, self.target))
except (TypeError, ValueError):
# self.target might be None
pass
# temperature is stable when all recorded values in the window
# differ from setpoint by less than tolerance
currenttime = time.time()
window.append((currenttime, sample))
while window[0][0] < currenttime - self.window:
# remove old/stale entries
window.pop(0)
# obtain min/max
deviation = 0
for _, T in window:
if abs(T - self.target) > deviation:
deviation = abs(T - self.target)
if (len(window) < 3) or deviation > self.tolerance:
self.status = status.BUSY, 'unstable'
elif self.setpoint == self.target:
self.status = status.OK, 'at target'
damper -= (damper - 1) / 10. # max value for damper is 11
else:
self.status = status.BUSY, 'ramping setpoint'
damper -= (damper - 1) / 20.
self.regulationtemp = round(regulation, 3)
self.sampletemp = round(sample, 3)
self.heaterpower = round(heater * self.maxpower * 0.01, 3)
self.heater = heater
timestamp = t
def shutdown(self):
# should be called from server when the server is stopped
self._stopflag = True
if self._thread and self._thread.isAlive():
self._thread.join()

265
secop/devices/demo.py Normal file
View File

@@ -0,0 +1,265 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
# *****************************************************************************
"""testing devices"""
import time
import random
import threading
from secop.devices.core import Readable, Driveable, PARAM
from secop.validators import *
from secop.protocol import status
class Switch(Driveable):
"""switch it on or off....
"""
PARAMS = {
'value': PARAM('current state (on or off)',
validator=enum(on=1, off=0), default=0),
'target': PARAM('wanted state (on or off)',
validator=enum(on=1, off=0), default=0,
readonly=False),
'switch_on_time': PARAM('how long to wait after switching the switch on', validator=floatrange(0, 60), unit='s', default=10, export=False),
'switch_off_time': PARAM('how long to wait after switching the switch off', validator=floatrange(0, 60), unit='s', default=10, export=False),
}
def init(self):
self._started = 0
def read_value(self, maxage=0):
# could ask HW
# we just return the value of the target here.
self._update()
return self.value
def read_target(self, maxage=0):
# could ask HW
return self.target
def write_target(self, value):
# could tell HW
pass
# note: setting self.target to the new value is done after this....
# note: we may also return the read-back value from the hw here
def read_status(self, maxage=0):
self.log.info("read status")
self._update()
if self.target == self.value:
return status.OK
return status.BUSY
def _update(self):
started = self.PARAMS['target'].timestamp
if self.target > self.value:
if time.time() > started + self.switch_on_time:
self.log.debug('is switched ON')
self.value = self.target
elif self.target < self.value:
if time.time() > started + self.switch_off_time:
self.log.debug('is switched OFF')
self.value = self.target
class MagneticField(Driveable):
"""a liquid magnet
"""
PARAMS = {
'value': PARAM('current field in T', unit='T',
validator=floatrange(-15, 15), default=0),
'ramp': PARAM('moving speed in T/min', unit='T/min',
validator=floatrange(0, 1), default=0.1, readonly=False),
'mode': PARAM('what to do after changing field', default=0,
validator=enum(persistent=1, hold=0), readonly=False),
'heatswitch': PARAM('heat switch device',
validator=str, export=False),
}
def init(self):
self._state = 'idle'
self._heatswitch = self.DISPATCHER.get_device(self.heatswitch)
_thread = threading.Thread(target=self._thread)
_thread.daemon = True
_thread.start()
def read_value(self, maxage=0):
return self.value
def write_target(self, value):
# could tell HW
return round(value, 2)
# note: setting self.target to the new value is done after this....
# note: we may also return the read-back value from the hw here
def read_status(self, maxage=0):
return status.OK if self._state == 'idle' else status.BUSY
def _thread(self):
loopdelay = 1
while True:
ts = time.time()
if self._state == 'idle':
if self.target != self.value:
self.log.debug('got new target -> switching heater on')
self._state = 'switch_on'
self._heatswitch.write_target('on')
if self._state == 'switch_on':
# wait until switch is on
if self._heatswitch.read_value() == 'on':
self.log.debug(
'heatswitch is on -> ramp to %.3f' %
self.target)
self._state = 'ramp'
if self._state == 'ramp':
if self.target == self.value:
self.log.debug('at field! mode is %r' % self.mode)
if self.mode:
self.log.debug('at field -> switching heater off')
self._state = 'switch_off'
self._heatswitch.write_target('off')
else:
self.log.debug('at field -> hold')
self._state = 'idle'
self.status = self.read_status() # push async
else:
step = self.ramp * loopdelay / 60.
step = max(min(self.target - self.value, step), -step)
self.value += step
if self._state == 'switch_off':
# wait until switch is off
if self._heatswitch.read_value() == 'off':
self.log.debug('heatswitch is off at %.3f' % self.value)
self._state = 'idle'
self.read_status() # update async
time.sleep(max(0.01, ts + loopdelay - time.time()))
self.log.error(self, 'main thread exited unexpectedly!')
class CoilTemp(Readable):
"""a coil temperature
"""
PARAMS = {
'value': PARAM('Coil temperatur in K', unit='K',
validator=float, default=0),
'sensor': PARAM("Sensor number or calibration id",
validator=str, readonly=True),
}
def read_value(self, maxage=0):
return round(2.3 + random.random(), 3)
class SampleTemp(Driveable):
"""a sample temperature
"""
PARAMS = {
'value': PARAM('Sample temperatur in K', unit='K',
validator=float, default=10),
'sensor': PARAM("Sensor number or calibration id",
validator=str, readonly=True),
'ramp': PARAM('moving speed in K/min',
validator=floatrange(0, 100), unit='K/min', default=0.1, readonly=False),
}
def init(self):
_thread = threading.Thread(target=self._thread)
_thread.daemon = True
_thread.start()
def write_target(self, value):
# could tell HW
return round(value, 2)
# note: setting self.target to the new value is done after this....
# note: we may also return the read-back value from the hw here
def _thread(self):
loopdelay = 1
while True:
ts = time.time()
if self.value == self.target:
if self.status != status.OK:
self.status = status.OK
else:
self.status = status.BUSY
step = self.ramp * loopdelay / 60.
step = max(min(self.target - self.value, step), -step)
self.value += step
time.sleep(max(0.01, ts + loopdelay - time.time()))
self.log.error(self, 'main thread exited unexpectedly!')
class Label(Readable):
"""
"""
PARAMS = {
'system': PARAM("Name of the magnet system",
validator=str, export=False),
'subdev_mf': PARAM("name of subdevice for magnet status",
validator=str, export=False),
'subdev_ts': PARAM("name of subdevice for sample temp",
validator=str, export=False),
'value': PARAM("Value of out label string",
validator=str)
}
def read_value(self, maxage=0):
strings = [self.system]
dev_ts = self.DISPATCHER.get_device(self.subdev_ts)
if dev_ts:
strings.append('at %.3f %s' %
(dev_ts.read_value(), dev_ts.PARAMS['value'].unit))
else:
strings.append('No connection to sample temp!')
dev_mf = self.DISPATCHER.get_device(self.subdev_mf)
if dev_mf:
mf_stat = dev_mf.read_status()
mf_mode = dev_mf.mode
mf_val = dev_mf.value
mf_unit = dev_mf.PARAMS['value'].unit
if mf_stat == status.OK:
state = 'Persistent' if mf_mode else 'Non-persistent'
else:
state = 'ramping'
strings.append('%s at %.1f %s' % (state, mf_val, mf_unit))
else:
strings.append('No connection to magnetic field!')
return '; '.join(strings)
class ValidatorTest(Readable):
"""
"""
PARAMS = {
'oneof': PARAM('oneof', validator=oneof(int, 'X', 2.718), readonly=False, default=4.0),
'enum': PARAM('enum', validator=enum('boo', 'faar', z=9), readonly=False, default=1),
'vector': PARAM('vector of int, float and str', validator=vector(int, float, str), readonly=False, default=(1, 2.3, 'a')),
'array': PARAM('array: 2..3 time oneof(0,1)', validator=array(oneof(2, 3), oneof(0, 1)), readonly=False, default=[1, 0, 1]),
'nonnegative': PARAM('nonnegative', validator=nonnegative(), readonly=False, default=0),
'positive': PARAM('positive', validator=positive(), readonly=False, default=1),
'intrange': PARAM('intrange', validator=intrange(2, 9), readonly=False, default=4),
'floatrange': PARAM('floatrange', validator=floatrange(-1, 1), readonly=False, default=0,),
}

51
secop/devices/epics.py Normal file
View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
# *****************************************************************************
"""testing devices"""
import random
from secop.devices.core import Readable, Driveable, PARAM
try:
from epics import PV
except ImportError:
PV = None
class EPICS_PV(Driveable):
"""pyepics test device."""
PARAMS = {
'sensor': PARAM("Sensor number or calibration id",
validator=str, readonly=True),
'max_rpm': PARAM("Maximum allowed rpm",
validator=str, readonly=True),
}
def read_value(self, maxage=0):
p1 = PV('testpv.VAL')
return p1.value
def write_target(self, target):
p1 = PV('test.VAL')
p1.value = target

74
secop/devices/test.py Normal file
View File

@@ -0,0 +1,74 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
# *****************************************************************************
"""testing devices"""
import random
from secop.devices.core import Readable, Driveable, PARAM
from secop.validators import floatrange
class LN2(Readable):
"""Just a readable.
class name indicates it to be a sensor for LN2,
but the implementation may do anything
"""
def read_value(self, maxage=0):
return round(100 * random.random(), 1)
class Heater(Driveable):
"""Just a driveable.
class name indicates it to be some heating element,
but the implementation may do anything
"""
PARAMS = {
'maxheaterpower': PARAM('maximum allowed heater power',
validator=floatrange(0, 100), unit='W'),
}
def read_value(self, maxage=0):
return round(100 * random.random(), 1)
def write_target(self, target):
pass
class Temp(Driveable):
"""Just a driveable.
class name indicates it to be some temperature controller,
but the implementation may do anything
"""
PARAMS = {
'sensor': PARAM("Sensor number or calibration id",
validator=str, readonly=True),
}
def read_value(self, maxage=0):
return round(100 * random.random(), 1)
def write_target(self, target):
pass