merge until "support write_ method on readonly param and more"

from gerrit

Change-Id: I8d2ad2a381d3a37947d8afc5e17be0428d94df36
This commit is contained in:
2022-03-21 14:00:12 +01:00
parent 7c9296fe2e
commit b1ddc01fbb
24 changed files with 604 additions and 436 deletions

View File

@ -72,7 +72,7 @@ class HeLevel(HasIO, Readable):
"""
name, txtvalue = self.communicate(cmd).split('=')
assert name == cmd.split('=')[0] # check that we got a reply to our command
return txtvalue # Frappy will automatically convert the string to the needed data type
return float(txtvalue)
def read_value(self):
return self.query('h')

140
secop_psi/historywriter.py Normal file
View File

@ -0,0 +1,140 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
import time
import frappyhistory # pylint: disable=import-error
from secop.datatypes import get_datatype, IntRange, FloatRange, ScaledInteger,\
EnumType, BoolType, StringType, TupleOf, StructOf
def make_cvt_list(dt, tail=''):
"""create conversion list
list of tuple (<conversion function>, <tail>, <curve options>)
tail is a postfix to be appended in case of tuples and structs
"""
if isinstance(dt, (EnumType, IntRange, BoolType)):
return[(int, tail, dict(type='NUM'))]
if isinstance(dt, (FloatRange, ScaledInteger)):
return [(dt.import_value, tail, dict(type='NUM', unit=dt.unit, period=5) if dt.unit else {})]
if isinstance(dt, StringType):
return [(lambda x: x, tail, dict(type='STR'))]
if isinstance(dt, TupleOf):
items = enumerate(dt.members)
elif isinstance(dt, StructOf):
items = dt.members.items()
else:
return [] # ArrayType, BlobType and TextType are ignored: too much data, probably not used
result = []
for subkey, elmtype in items:
for fun, tail_, opts in make_cvt_list(elmtype, '%s.%s' % (tail, subkey)):
result.append((lambda v, k=subkey, f=fun: f(v[k]), tail_, opts))
return result
class FrappyHistoryWriter(frappyhistory.FrappyWriter):
"""extend writer to be used as an internal frappy connection
API of frappyhistory.FrappyWriter:
:meth:`put_def`(key, opts):
define or overwrite a new curve named <key> with options from dict <opts>
options:
- type:
'NUM' (any number) or 'STR' (text)
remark: tuples and structs create multiple curves
- period:
the typical 'lifetime' of a value.
The intention is, that points in a chart may be connected by a straight line
when the distance is lower than this value. If not, the line should be drawn
horizontally from the last point to a point <period> before the next value.
For example a setpoint should have period 0, which will lead to a stepped
line, whereas for a measured value like a temperature, period should be
slightly bigger than the poll interval. In order to make full use of this,
we would need some additional parameter property.
- show: True/False, whether this curve should be shown or not by default in
a summary chart
- label: a label for the curve in the chart
:meth:`put`(timestamp, key, value)
timestamp: the timestamp. must not decrease!
key: the curve name
value: the value to be stored, converted to a string. '' indicates an undefined value
self.cache is a dict <key> of <value as string>, containing the last used value
"""
def __init__(self, directory, predefined_names, dispatcher):
super().__init__(directory)
self.predefined_names = predefined_names
self.cvt_lists = {} # dict <mod:param> of <conversion list>
self.activated = False
self.dispatcher = dispatcher
self._init_time = None
def init(self, msg):
"""initialize from the 'describing' message"""
action, _, description = msg
assert action == 'describing'
self._init_time = time.time()
for modname, moddesc in description['modules'].items():
for pname, pdesc in moddesc['accessibles'].items():
ident = key = modname + ':' + pname
if pname.startswith('_') and pname[1:] not in self.predefined_names:
key = modname + ':' + pname[1:]
dt = get_datatype(pdesc['datainfo'])
cvt_list = make_cvt_list(dt, key)
for _, hkey, opts in cvt_list:
if pname == 'value':
opts['period'] = opts.get('period', 0)
opts['show'] = True
opts['label'] = modname
elif pname == 'target':
opts['period'] = 0
opts['label'] = modname + '_target'
opts['show'] = True
self.put_def(hkey, opts)
self.cvt_lists[ident] = cvt_list
# self.put(self._init_time, 'STR', 'vars', ' '.join(vars))
self.dispatcher.handle_activate(self, None, None)
self._init_time = None
def send_reply(self, msg):
action, ident, value = msg
if not action.endswith('update'):
print('unknown async message %r' % msg)
return
now = self._init_time or time.time() # on initialisation, use the same timestamp for all
if action == 'update':
for fun, key, _ in self.cvt_lists[ident]:
# we only look at the value, qualifiers are ignored for now
# we do not use the timestamp here, as a potentially decreasing value might
# bring the reader software into trouble
self.put(now, key, str(fun(value[0])))
else: # error_update
for _, key, _ in self.cvt_lists[ident]:
old = self.cache.get(key)
if old is None:
return # ignore if this key is not yet used
self.put(now, key, '')

View File

@ -18,12 +18,18 @@
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""Keithley 2601B source meter
"""Keithley 2601B 4 quadrant source meter
not tested yet"""
not tested yet
from secop.core import Attached, BoolType, EnumType, FloatRange, \
HasIodev, Module, Parameter, StringIO, Writable, Done
* switching between voltage and current happens by setting their target
* switching output off by setting the active parameter of the controlling
module to False.
* setting the active parameter to True raises an error
"""
from secop.core import Attached, BoolType, Done, EnumType, FloatRange, \
HasIO, Module, Parameter, Readable, StringIO, Writable
class K2601bIO(StringIO):
@ -32,128 +38,162 @@ class K2601bIO(StringIO):
SOURCECMDS = {
0: 'reset()'
' smua.source.output = 0 print("ok")',
1: 'reset()'
' smua.source.func = smua.OUTPUT_DCAMPS'
' display.smua.measure.func = display.MEASURE_VOLTS'
' smua.source.autorangei = 1'
' smua.source.output = %d print("ok"")',
1: 'reset()'
' smua.source.output = 1 print("ok")',
2: 'reset()'
' smua.source.func = smua.OUTPUT_DCVOLTS'
' display.smua.measure.func = display.MEASURE_DCAMPS'
' smua.source.autorangev = 1'
' smua.source.output = %d print("ok"")',
' smua.source.output = 1 print("ok")',
}
class SourceMeter(HasIodev, Module):
class SourceMeter(HasIO, Module):
export = False # export for tests only
mode = Parameter('measurement mode', EnumType(off=0, current=1, voltage=2),
readonly=False, export=False)
ilimit = Parameter('current limit', FloatRange(0, 2.0, unit='A'), default=2)
vlimit = Parameter('voltage limit', FloatRange(0, 2.0, unit='V'), default=2)
resistivity = Parameter('readback resistivity', FloatRange(unit='Ohm'), poll=True)
power = Parameter('readback power', FloatRange(unit='W'), poll=True)
mode = Parameter('measurement mode', EnumType(current_off=0, voltage_off=1, current_on=2, voltage_on=3),
readonly=False, poll=True)
iodevClass = K2601bIO
def read_resistivity(self):
return self.sendRecv('print(smua.measure.r())')
def read_power(self):
return self.sendRecv('print(smua.measure.p())')
ioClass = K2601bIO
def read_mode(self):
return float(self.sendRecv('print(smua.source.func+2*smua.source.output)'))
return float(self.communicate('print((smua.source.func+1)*smua.source.output)'))
def write_mode(self, value):
assert 'ok' == self.sendRecv(SOURCECMDS[value % 2] % (value >= 2))
if value == 'current':
self.write_vlimit(self.vlimit)
elif value == 'voltage':
self.write_ilimit(self.ilimit)
assert self.communicate(SOURCECMDS[value]) == 'ok'
return self.read_mode()
def read_ilimit(self):
if self.mode == 'current':
return self.ilimit
return float(self.communicate('print(smua.source.limiti)'))
class Current(HasIodev, Writable):
sourcemeter = Attached()
def write_ilimit(self, value):
if self.mode == 'current':
return self.ilimit
return float(self.communicate('smua.source.limiti = %g print(smua.source.limiti)' % value))
value = Parameter('measured current', FloatRange(unit='A'), poll=True)
target = Parameter('set current', FloatRange(unit='A'), poll=True)
active = Parameter('current is controlled', BoolType(), default=False) # polled by SourceMeter
limit = Parameter('current limit', FloatRange(0, 2.0, unit='A'), default=2, poll=True)
def read_vlimit(self):
if self.mode == 'voltage':
return self.ilimit
return float(self.communicate('print(smua.source.limitv)'))
def initModule(self):
self._sourcemeter.registerCallbacks(self)
def write_vlimit(self, value):
if self.mode == 'voltage':
return self.ilimit
return float(self.communicate('smua.source.limitv = %g print(smua.source.limitv)' % value))
class Power(HasIO, Readable):
value = Parameter('readback power', FloatRange(unit='W'))
ioClass = K2601bIO
def read_value(self):
return self.sendRecv('print(smua.measure.i())')
return float(self.communicate('print(smua.measure.p())'))
class Resistivity(HasIO, Readable):
value = Parameter('readback resistivity', FloatRange(unit='Ohm'))
ioClass = K2601bIO
def read_value(self):
return float(self.communicate('print(smua.measure.r())'))
class Current(HasIO, Writable):
sourcemeter = Attached()
value = Parameter('measured current', FloatRange(unit='A'))
target = Parameter('set current', FloatRange(unit='A'))
active = Parameter('current is controlled', BoolType(), default=False)
limit = Parameter('current limit', FloatRange(0, 2.0, unit='A'), default=2)
def initModule(self):
self.sourcemeter.registerCallbacks(self)
def read_value(self):
return float(self.communicate('print(smua.measure.i())'))
def read_target(self):
return self.sendRecv('print(smua.source.leveli)')
return float(self.communicate('print(smua.source.leveli)'))
def write_target(self, value):
if not self.active:
raise ValueError('current source is disabled')
if value > self.limit:
if value > self.sourcemeter.ilimit:
raise ValueError('current exceeds limit')
return self.sendRecv('smua.source.leveli = %g print(smua.source.leveli)' % value)
value = float(self.communicate('smua.source.leveli = %g print(smua.source.leveli)' % value))
if not self.active:
self.sourcemeter.write_mode('current') # triggers update_mode -> set active to True
return value
def read_limit(self):
if self.active:
return self.limit
return self.sendRecv('print(smua.source.limiti)')
return self.sourcemeter.read_ilimit()
def write_limit(self, value):
if self.active:
return value
return self.sendRecv('smua.source.limiti = %g print(smua.source.limiti)' % value)
def write_active(self, value):
if value:
self._sourcemeter.write_mode('current_on')
elif self._sourcemeter.mode == 'current_on':
self._sourcemeter.write_mode('current_off')
return self.active
return self.sourcemeter.write_ilimit(value)
def update_mode(self, mode):
# will be called whenever the attached sourcemeters mode changes
self.active = mode == 'current_on'
self.active = mode == 'current'
def write_active(self, value):
self.sourcemeter.read_mode()
if value == self.value:
return Done
if value:
raise ValueError('activate only by setting target')
self.sourcemeter.write_mode('off') # triggers update_mode -> set active to False
return Done
class Voltage(HasIodev, Writable):
class Voltage(HasIO, Writable):
sourcemeter = Attached()
value = Parameter('measured voltage', FloatRange(unit='V'), poll=True)
target = Parameter('set voltage', FloatRange(unit='V'), poll=True)
active = Parameter('voltage is controlled', BoolType(), default=False) # polled by SourceMeter
limit = Parameter('current limit', FloatRange(0, 2.0, unit='V'), default=2, poll=True)
value = Parameter('measured voltage', FloatRange(unit='V'))
target = Parameter('set voltage', FloatRange(unit='V'))
active = Parameter('voltage is controlled', BoolType())
limit = Parameter('voltage limit', FloatRange(0, 2.0, unit='V'), default=2)
def initModule(self):
self._sourcemeter.registerCallbacks(self)
self.sourcemeter.registerCallbacks(self)
def read_value(self):
return self.sendRecv('print(smua.measure.v())')
return float(self.communicate('print(smua.measure.v())'))
def read_target(self):
return self.sendRecv('print(smua.source.levelv)')
return float(self.communicate('print(smua.source.levelv)'))
def write_target(self, value):
if not self.active:
raise ValueError('voltage source is disabled')
if value > self.limit:
if value > self.sourcemeter.vlimit:
raise ValueError('voltage exceeds limit')
return self.sendRecv('smua.source.levelv = %g print(smua.source.levelv)' % value)
value = float(self.communicate('smua.source.levelv = %g print(smua.source.levelv)' % value))
if not self.active:
self.sourcemeter.write_mode('voltage') # triggers update_mode -> set active to True
return value
def read_limit(self):
if self.active:
return self.limit
return self.sendRecv('print(smua.source.limitv)')
return self.sourcemeter.read_vlimit()
def write_limit(self, value):
if self.active:
return value
return self.sendRecv('smua.source.limitv = %g print(smua.source.limitv)' % value)
def write_active(self, value):
if value:
self._sourcemeter.write_mode('voltage_on')
elif self._sourcemeter.mode == 'voltage_on':
self._sourcemeter.write_mode('voltage_off')
return self.active
return self.sourcemeter.write_vlimit(value)
def update_mode(self, mode):
# will be called whenever the attached sourcemeters mode changes
self.active = mode == 'voltage_on'
self.active = mode == 'voltage'
def write_active(self, value):
self.sourcemeter.read_mode()
if value == self.value:
return Done
if value:
raise ValueError('activate only by setting target')
self.sourcemeter.write_mode('off') # triggers update_mode -> set active to False
return Done

View File

@ -27,8 +27,7 @@ from secop.datatypes import BoolType, EnumType, FloatRange, IntRange
from secop.lib import formatStatusBits
from secop.modules import Attached, Done, \
Drivable, Parameter, Property, Readable
from secop.poller import REGULAR, Poller
from secop.io import HasIodev
from secop.io import HasIO
Status = Drivable.Status
@ -58,29 +57,29 @@ class StringIO(secop.io.StringIO):
wait_before = 0.05
class Main(HasIodev, Drivable):
class Main(HasIO, Drivable):
value = Parameter('the current channel', poll=REGULAR, datatype=IntRange(0, 17))
value = Parameter('the current channel', datatype=IntRange(0, 17))
target = Parameter('channel to select', datatype=IntRange(0, 17))
autoscan = Parameter('whether to scan automatically', datatype=BoolType(), readonly=False, default=False)
pollinterval = Parameter(default=1, export=False)
pollerClass = Poller
iodevClass = StringIO
ioClass = StringIO
_channel_changed = 0 # time of last channel change
_channels = None # dict <channel no> of <module object>
def earlyInit(self):
super().earlyInit()
self._channels = {}
def register_channel(self, modobj):
self._channels[modobj.channel] = modobj
def startModule(self, started_callback):
started_callback()
def startModule(self, start_events):
super().startModule(start_events)
for ch in range(1, 16):
if ch not in self._channels:
self.sendRecv('INSET %d,0,0,0,0,0;INSET?%d' % (ch, ch))
self.communicate('INSET %d,0,0,0,0,0;INSET?%d' % (ch, ch))
def read_value(self):
channel, auto = scan.send_command(self)
@ -113,7 +112,7 @@ class Main(HasIodev, Drivable):
def write_target(self, channel):
scan.send_change(self, channel, self.autoscan)
# self.sendRecv('SCAN %d,%d;SCAN?' % (channel, self.autoscan))
# self.communicate('SCAN %d,%d;SCAN?' % (channel, self.autoscan))
if channel != self.value:
self.value = 0
self._channel_changed = time.time()
@ -122,11 +121,11 @@ class Main(HasIodev, Drivable):
def write_autoscan(self, value):
scan.send_change(self, self.value, value)
# self.sendRecv('SCAN %d,%d;SCAN?' % (channel, self.autoscan))
# self.communicate('SCAN %d,%d;SCAN?' % (channel, self.autoscan))
return value
class ResChannel(HasIodev, Readable):
class ResChannel(HasIO, Readable):
"""temperature channel on Lakeshore 336"""
RES_RANGE = {key: i+1 for i, key in list(
@ -140,8 +139,7 @@ class ResChannel(HasIodev, Readable):
enumerate(mag % val for mag in ['%guV', '%gmV']
for val in [2, 6.32, 20, 63.2, 200, 632]))}
pollerClass = Poller
iodevClass = StringIO
ioClass = StringIO
_main = None # main module
_last_range_change = 0 # time of last range change
@ -166,6 +164,7 @@ class ResChannel(HasIodev, Readable):
_trigger_read = False
def initModule(self):
super().initModule()
self._main = self.DISPATCHER.get_module(self.main)
self._main.register_channel(self)
@ -181,7 +180,7 @@ class ResChannel(HasIodev, Readable):
return Done
# we got here, when we missed the idle state of self._main
self._trigger_read = False
result = self.sendRecv('RDGR?%d' % self.channel)
result = self.communicate('RDGR?%d' % self.channel)
result = float(result)
if self.autorange == 'soft':
now = time.time()
@ -214,9 +213,9 @@ class ResChannel(HasIodev, Readable):
def read_status(self):
if not self.enabled:
return [self.Status.DISABLED, 'disabled']
if self.channel != self._main.value:
if self.channel != self.main.value:
return Done
result = int(self.sendRecv('RDGST?%d' % self.channel))
result = int(self.communicate('RDGST?%d' % self.channel))
result &= 0x37 # mask T_OVER and T_UNDER (change this when implementing temperatures instead of resistivities)
statustext = ' '.join(formatStatusBits(result, STATUS_BIT_LABELS))
if statustext:
@ -228,7 +227,7 @@ class ResChannel(HasIodev, Readable):
if autorange:
result['autorange'] = 'hard'
# else: do not change autorange
# self.log.info('%s range %r %r %r' % (self.name, rng, autorange, self.autorange))
self.log.debug('%s range %r %r %r' % (self.name, rng, autorange, self.autorange))
if excoff:
result.update(iexc=0, vexc=0)
elif iscur:
@ -281,5 +280,5 @@ class ResChannel(HasIodev, Readable):
def write_enabled(self, value):
inset.write(self, 'enabled', value)
if value:
self._main.write_target(self.channel)
self.main.write_target(self.channel)
return Done

View File

@ -24,13 +24,12 @@
import time
import struct
from math import log10
from secop.core import BoolType, Command, EnumType, FloatRange, IntRange, \
HasIodev, Parameter, Property, Drivable, PersistentMixin, PersistentParam
HasIO, Parameter, Property, Drivable, PersistentMixin, PersistentParam, Done
from secop.io import BytesIO
from secop.errors import CommunicationFailedError, HardwareError, BadValueError, IsBusyError
from secop.rwhandler import ReadHandler, WriteHandler
MOTOR_STOP = 3
MOVE = 4
@ -50,84 +49,78 @@ MAX_SPEED = 2047 * SPEED_SCALE
ACCEL_SCALE = 1E12 / 2 ** 31 * ANGLE_SCALE
MAX_ACCEL = 2047 * ACCEL_SCALE
CURRENT_SCALE = 2.8/250
ENCODER_RESOLUTION = 0.4 # 360 / 1024, rounded up
ENCODER_RESOLUTION = 360 / 1024
HW_ARGS = {
# <parameter name>: (address, scale factor)
'encoder_tolerance': (212, ANGLE_SCALE),
'speed': (4, SPEED_SCALE),
'minspeed': (130, SPEED_SCALE),
'currentspeed': (3, SPEED_SCALE),
'maxcurrent': (6, CURRENT_SCALE),
'standby_current': (7, CURRENT_SCALE,),
'acceleration': (5, ACCEL_SCALE),
'target_reached': (8, 1),
'move_status': (207, 1),
'error_bits': (208, 1),
'free_wheeling': (204, 0.01),
'power_down_delay': (214, 0.01),
}
# special handling (adjust zero):
ENCODER_ADR = 209
STEPPOS_ADR = 1
class HwParam(PersistentParam):
adr = Property('parameter address', IntRange(0, 255), export=False)
scale = Property('scale factor (physical value / unit)', FloatRange(), export=False)
def __init__(self, description, datatype, adr, scale=1, poll=True,
readonly=True, persistent=None, **kwds):
"""hardware parameter"""
if persistent is None:
persistent = not readonly
if isinstance(datatype, FloatRange) and datatype.fmtstr == '%g':
datatype.fmtstr = '%%.%df' % max(0, 1 - int(log10(scale) + 0.01))
super().__init__(description, datatype, poll=poll, adr=adr, scale=scale,
persistent=persistent, readonly=readonly, **kwds)
def copy(self):
res = HwParam(self.description, self.datatype.copy(), self.adr)
res.name = self.name
res.init(self.propertyValues)
return res
def writable(*args, **kwds):
"""convenience function to create writable hardware parameters"""
return PersistentParam(*args, readonly=False, initwrite=True, **kwds)
class Motor(PersistentMixin, HasIodev, Drivable):
class Motor(PersistentMixin, HasIO, Drivable):
address = Property('module address', IntRange(0, 255), default=1)
value = Parameter('motor position', FloatRange(unit='deg', fmtstr='%.3f'), poll=False, default=0) # polling by read_status
value = Parameter('motor position', FloatRange(unit='deg', fmtstr='%.3f'))
zero = PersistentParam('zero point', FloatRange(unit='$'), readonly=False, default=0)
encoder = HwParam('encoder reading', FloatRange(unit='$', fmtstr='%.1f'),
209, ANGLE_SCALE, readonly=True, initwrite=False, persistent=True)
steppos = HwParam('position from motor steps', FloatRange(unit='$'),
1, ANGLE_SCALE, readonly=True, initwrite=False)
target = Parameter('_', FloatRange(unit='$'), default=0)
encoder = PersistentParam('encoder reading', FloatRange(unit='$', fmtstr='%.1f'),
readonly=True, initwrite=False)
steppos = PersistentParam('position from motor steps', FloatRange(unit='$', fmtstr='%.3f'),
readonly=True, initwrite=False)
target = Parameter('', FloatRange(unit='$'), default=0)
move_limit = Parameter('max. angle to drive in one go when current above safe_current', FloatRange(unit='$'),
readonly=False, default=5, group='more')
safe_current = Parameter('motor current allowed for big steps', FloatRange(unit='A'),
readonly=False, default=0.2, group='more')
move_limit = Parameter('max. angle to drive in one go', FloatRange(unit='$'),
readonly=False, default=360, group='more')
tolerance = Parameter('positioning tolerance', FloatRange(unit='$'),
readonly=False, default=0.9)
encoder_tolerance = HwParam('the allowed deviation between steppos and encoder\n\nmust be > tolerance',
FloatRange(0, 360., unit='$'),
212, ANGLE_SCALE, readonly=False, group='more')
speed = HwParam('max. speed', FloatRange(0, MAX_SPEED, unit='$/sec'),
4, SPEED_SCALE, readonly=False, group='motorparam')
minspeed = HwParam('min. speed', FloatRange(0, MAX_SPEED, unit='$/sec'),
130, SPEED_SCALE, readonly=False, default=SPEED_SCALE, group='motorparam')
currentspeed = HwParam('current speed', FloatRange(-MAX_SPEED, MAX_SPEED, unit='$/sec'),
3, SPEED_SCALE, readonly=True, group='motorparam')
maxcurrent = HwParam('_', FloatRange(0, 2.8, unit='A'),
6, CURRENT_SCALE, readonly=False, group='motorparam')
standby_current = HwParam('_', FloatRange(0, 2.8, unit='A'),
7, CURRENT_SCALE, readonly=False, group='motorparam')
acceleration = HwParam('_', FloatRange(4.6 * ACCEL_SCALE, MAX_ACCEL, unit='deg/s^2'),
5, ACCEL_SCALE, readonly=False, group='motorparam')
target_reached = HwParam('_', BoolType(), 8, group='hwstatus')
move_status = HwParam('_', IntRange(0, 3),
207, readonly=True, group='hwstatus')
error_bits = HwParam('_', IntRange(0, 255),
208, readonly=True, group='hwstatus')
# the doc says msec, but I believe the scale is 10 msec
free_wheeling = HwParam('_', FloatRange(0, 60., unit='sec'),
204, 0.01, default=0.1, readonly=False, group='motorparam')
power_down_delay = HwParam('_', FloatRange(0, 60., unit='sec'),
214, 0.01, default=0.1, readonly=False, group='motorparam')
baudrate = Parameter('_', EnumType({'%d' % v: i for i, v in enumerate(BAUDRATES)}),
readonly=False, default=0, poll=True, visibility=3, group='more')
encoder_tolerance = writable('the allowed deviation between steppos and encoder\n\nmust be > tolerance',
FloatRange(0, 360., unit='$', fmtstr='%.3f'), group='more')
speed = writable('max. speed', FloatRange(0, MAX_SPEED, unit='$/sec', fmtstr='%.1f'), default=40)
minspeed = writable('min. speed', FloatRange(0, MAX_SPEED, unit='$/sec', fmtstr='%.1f'),
default=SPEED_SCALE, group='motorparam')
currentspeed = Parameter('current speed', FloatRange(-MAX_SPEED, MAX_SPEED, unit='$/sec', fmtstr='%.1f'),
group='motorparam')
maxcurrent = writable('', FloatRange(0, 2.8, unit='A', fmtstr='%.2f'),
default=1.4, group='motorparam')
standby_current = writable('', FloatRange(0, 2.8, unit='A', fmtstr='%.2f'),
default=0.1, group='motorparam')
acceleration = writable('', FloatRange(4.6 * ACCEL_SCALE, MAX_ACCEL, unit='deg/s^2', fmtstr='%.1f'),
default=150., group='motorparam')
target_reached = Parameter('', BoolType(), group='hwstatus')
move_status = Parameter('', IntRange(0, 3), group='hwstatus')
error_bits = Parameter('', IntRange(0, 255), group='hwstatus')
free_wheeling = writable('', FloatRange(0, 60., unit='sec', fmtstr='%.2f'),
default=0.1, group='motorparam')
power_down_delay = writable('', FloatRange(0, 60., unit='sec', fmtstr='%.2f'),
default=0.1, group='motorparam')
baudrate = Parameter('', EnumType({'%d' % v: i for i, v in enumerate(BAUDRATES)}),
readonly=False, default=0, visibility=3, group='more')
pollinterval = Parameter(group='more')
iodevClass = BytesIO
# fast_pollfactor = 0.001 # poll as fast as possible when busy
fast_pollfactor = 0.05
ioClass = BytesIO
fast_pollfactor = 0.001 # not used any more, TODO: use a statemachine for running
_started = 0
_calc_timeout = True
_calcTimeout = True
_need_reset = None
_last_change = 0
def comm(self, cmd, adr, value=0, bank=0):
"""set or get a parameter
@ -138,21 +131,23 @@ class Motor(PersistentMixin, HasIodev, Drivable):
:param value: if given, the parameter is written, else it is returned
:return: the returned value
"""
if self._calc_timeout:
self._calc_timeout = False
baudrate = getattr(self._iodev._conn.connection, 'baudrate', None)
if self._calcTimeout and self.io._conn:
self._calcTimeout = False
baudrate = getattr(self.io._conn.connection, 'baudrate', None)
if baudrate:
if baudrate not in BAUDRATES:
raise CommunicationFailedError('unsupported baud rate: %d' % baudrate)
self._iodev.timeout = 0.03 + 200 / baudrate
self.io.timeout = 0.03 + 200 / baudrate
exc = None
byt = struct.pack('>BBBBi', self.address, cmd, adr, bank, round(value))
byt += bytes([sum(byt) & 0xff])
for itry in range(3,0,-1):
byt = struct.pack('>BBBBi', self.address, cmd, adr, bank, round(value))
try:
reply = self._iodev.communicate(byt + bytes([sum(byt) & 0xff]), 9)
reply = self.communicate(byt, 9)
if sum(reply[:-1]) & 0xff != reply[-1]:
raise CommunicationFailedError('checksum error')
# will try again
except Exception as e:
if itry == 1:
raise
@ -168,36 +163,18 @@ class Motor(PersistentMixin, HasIodev, Drivable):
raise CommunicationFailedError('bad reply %r to command %s %d' % (reply, cmd, adr))
return result
def get(self, pname, **kwds):
"""get parameter"""
pobj = self.parameters[pname]
value = self.comm(GET_AXIS_PAR, pobj.adr, **kwds)
# do not apply scale when 1 (datatype might not be float)
return value if pobj.scale == 1 else value * pobj.scale
def startModule(self, start_events):
super().startModule(start_events)
def set(self, pname, value, check=True, **kwds):
"""set parameter and check result"""
pobj = self.parameters[pname]
scale = pobj.scale
rawvalue = round(value / scale)
for itry in range(2):
self.comm(SET_AXIS_PAR, pobj.adr, rawvalue, **kwds)
if check:
result = self.comm(GET_AXIS_PAR, pobj.adr, **kwds)
if result != rawvalue:
self.log.warning('result for %s does not match %d != %d, try again', pname, result, rawvalue)
continue
value = result * scale
return value
else:
raise HardwareError('result for %s does not match %d != %d' % (pname, result, rawvalue))
def fix_encoder(self=self):
try:
# get encoder value from motor. at this stage self.encoder contains the persistent value
encoder = self._read_axispar(ENCODER_ADR, ANGLE_SCALE) + self.zero
self.fix_encoder(encoder)
except Exception as e:
self.log.error('fix_encoder failed with %r', e)
def startModule(self, started_callback):
# get encoder value from motor. at this stage self.encoder contains the persistent value
encoder = self.get('encoder')
encoder += self.zero
self.fix_encoder(encoder)
super().startModule(started_callback)
start_events.queue(fix_encoder)
def fix_encoder(self, encoder_from_hw):
"""fix encoder value
@ -211,14 +188,52 @@ class Motor(PersistentMixin, HasIodev, Drivable):
# calculate nearest, most probable value
adjusted_encoder = encoder_from_hw + round((self.encoder - encoder_from_hw) / 360.) * 360
if abs(self.encoder - adjusted_encoder) >= self.encoder_tolerance:
# encoder module0 360 has changed
# encoder modulo 360 has changed
self.log.error('saved encoder value (%.2f) does not match reading (%.2f %.2f)',
self.encoder, encoder_from_hw, adjusted_encoder)
if adjusted_encoder != encoder_from_hw:
self.log.info('take next closest encoder value (%.2f)' % adjusted_encoder)
self._need_reset = True
self.status = self.Status.ERROR, 'saved encoder value does not match reading'
self.set('encoder', adjusted_encoder - self.zero, check=False)
self._write_axispar(adjusted_encoder - self.zero, ENCODER_ADR, ANGLE_SCALE, readback=False)
def _read_axispar(self, adr, scale=1):
value = self.comm(GET_AXIS_PAR, adr)
# do not apply scale when 1 (datatype might not be float)
return value if scale == 1 else value * scale
def _write_axispar(self, value, adr, scale=1, readback=True):
rawvalue = round(value / scale)
self.comm(SET_AXIS_PAR, adr, rawvalue)
if readback:
result = self.comm(GET_AXIS_PAR, adr)
if result != rawvalue:
raise HardwareError('result for adr=%d scale=%g does not match %g != %g'
% (adr, scale, result * scale, value))
return result * scale
return rawvalue * scale
@ReadHandler(HW_ARGS)
def read_hwparam(self, pname):
"""handle read for HwParam"""
args = HW_ARGS[pname]
reply = self._read_axispar(*args)
try:
value = getattr(self, pname)
except Exception:
return reply
if reply != value:
if not self.parameters[pname].readonly:
# this should not happen
self.log.warning('hw parameter %s has changed from %r to %r, write again', pname, value, reply)
self._write_axispar(value, *args, readback=False)
reply = self._read_axispar(*args)
return reply
@WriteHandler(HW_ARGS)
def write_hwparam(self, pname, value):
"""handler write for HwParam"""
return self._write_axispar(value, *HW_ARGS[pname])
def read_value(self):
encoder = self.read_encoder()
@ -240,7 +255,7 @@ class Motor(PersistentMixin, HasIodev, Drivable):
self._need_reset = True
self.status = self.Status.ERROR, 'power loss'
# or should we just fix instead of error status?
# self.set('steppos', self.steppos - self.zero, check=False)
# self._write_axispar(self.steppos - self.zero, readback=False)
self.comm(SET_GLOB_PAR, 255, 1, bank=2) # set initialized flag
self._started = 0
@ -256,13 +271,10 @@ class Motor(PersistentMixin, HasIodev, Drivable):
self.log.error('encoder (%.2f) does not match internal pos (%.2f)', self.encoder, self.steppos)
return self.Status.ERROR, 'encoder does not match internal pos'
return self.status
now = self.parameters['steppos'].timestamp
if self.steppos != oldpos:
self._last_change = now
return self.Status.BUSY, 'moving'
if now < self._last_change + 0.2 and not (self.read_target_reached() or self.read_move_status()
or self.read_error_bits()):
if oldpos != self.steppos or not (self.read_target_reached() or self.read_move_status()
or self.read_error_bits()):
return self.Status.BUSY, 'moving'
# TODO: handle the different errors from move_status and error_bits
diff = self.target - self.encoder
if abs(diff) <= self.tolerance:
self._started = 0
@ -273,9 +285,8 @@ class Motor(PersistentMixin, HasIodev, Drivable):
def write_target(self, target):
self.read_value() # make sure encoder and steppos are fresh
if self.maxcurrent >= self.safe_current + CURRENT_SCALE and (
abs(target - self.encoder) > self.move_limit + self.tolerance):
raise BadValueError('can not move more than %s deg %g %g' % (self.move_limit, self.encoder, target))
if abs(target - self.encoder) > self.move_limit:
raise BadValueError('can not move more than %s deg' % self.move_limit)
diff = self.encoder - self.steppos
if self._need_reset:
raise HardwareError('need reset (%s)' % self.status[1])
@ -284,91 +295,28 @@ class Motor(PersistentMixin, HasIodev, Drivable):
self._need_reset = True
self.status = self.Status.ERROR, 'encoder does not match internal pos'
raise HardwareError('need reset (encoder does not match internal pos)')
self.set('steppos', self.encoder - self.zero, check=False)
self._write_axispar(self.encoder - self.zero, STEPPOS_ADR, ANGLE_SCALE)
self._started = time.time()
self.log.debug('move to %.1f', target)
self.log.info('move to %.1f', target)
self.comm(MOVE, 0, (target - self.zero) / ANGLE_SCALE)
self.status = self.Status.BUSY, 'changed target'
return target
def write_zero(self, value):
diff = value - self.zero
self.encoder += diff
self.steppos += diff
self.value += diff
return value
self.zero = value
self.read_value() # apply zero to encoder, steppos and value
return Done
def read_encoder(self):
return self.get('encoder') + self.zero
return self._read_axispar(ENCODER_ADR, ANGLE_SCALE) + self.zero
def read_steppos(self):
return self.get('steppos') + self.zero
def read_encoder_tolerance(self):
return self.get('encoder_tolerance')
def write_encoder_tolerance(self, value):
return self.set('encoder_tolerance', value)
def read_target_reached(self):
return self.get('target_reached')
def read_speed(self):
return self.get('speed')
def write_speed(self, value):
return self.set('speed', value)
def read_minspeed(self):
return self.get('minspeed')
def write_minspeed(self, value):
return self.set('minspeed', value)
def read_currentspeed(self):
return self.get('currentspeed')
def read_acceleration(self):
return self.get('acceleration')
def write_acceleration(self, value):
return self.set('acceleration', value)
def read_maxcurrent(self):
return self.get('maxcurrent')
def write_maxcurrent(self, value):
return self.set('maxcurrent', value)
def read_standby_current(self):
return self.get('standby_current')
def write_standby_current(self, value):
return self.set('standby_current', value)
def read_free_wheeling(self):
return self.get('free_wheeling')
def write_free_wheeling(self, value):
return self.set('free_wheeling', value)
def read_power_down_delay(self):
return self.get('power_down_delay')
def write_power_down_delay(self, value):
return self.set('power_down_delay', value)
def read_move_status(self):
return self.get('move_status')
def read_error_bits(self):
return self.get('error_bits')
return self._read_axispar(STEPPOS_ADR, ANGLE_SCALE) + self.zero
@Command(FloatRange())
def set_zero(self, value):
"""adapt zero to make current position equal to given value"""
raw = self.read_value() - self.zero
self.write_zero(value - raw)
"""adjust zero"""
self.write_zero(value - self.read_value())
def read_baudrate(self):
return self.comm(GET_GLOB_PAR, 65)
@ -381,14 +329,14 @@ class Motor(PersistentMixin, HasIodev, Drivable):
"""set steppos to encoder value, if not within tolerance"""
if self._started:
raise IsBusyError('can not reset while moving')
tol = ENCODER_RESOLUTION
tol = ENCODER_RESOLUTION * 1.1
for itry in range(10):
diff = self.read_encoder() - self.read_steppos()
if abs(diff) <= tol:
self._need_reset = False
self.status = self.Status.IDLE, 'ok'
return
self.set('steppos', self.encoder - self.zero, check=False)
self._write_axispar(self.encoder - self.zero, STEPPOS_ADR, ANGLE_SCALE, readback=False)
self.comm(MOVE, 0, (self.encoder - self.zero) / ANGLE_SCALE)
time.sleep(0.1)
if itry > 5:
@ -398,23 +346,33 @@ class Motor(PersistentMixin, HasIodev, Drivable):
@Command()
def stop(self):
"""stop motor immediately"""
self.comm(MOTOR_STOP, 0)
self.status = self.Status.IDLE, 'stopped'
self.target = self.value # indicate to customers that this was stopped
self._started = 0
#@Command()
#def step(self):
# self.comm(MOVE, 1, FULL_STEP / ANGLE_SCALE)
@Command()
def step_forward(self):
"""move one full step forwards
#@Command()
#def back(self):
# self.comm(MOVE, 1, - FULL_STEP / ANGLE_SCALE)
for quick tests
"""
self.comm(MOVE, 1, FULL_STEP / ANGLE_SCALE)
#@Command(IntRange(), result=IntRange())
#def get_axis_par(self, adr):
# return self.comm(GET_AXIS_PAR, adr)
@Command()
def step_back(self):
"""move one full step backwards
#@Command((IntRange(), FloatRange()), result=IntRange())
#def set_axis_par(self, adr, value):
# return self.comm(SET_AXIS_PAR, adr, value)
for quick tests
"""
self.comm(MOVE, 1, - FULL_STEP / ANGLE_SCALE)
@Command(IntRange(), result=IntRange())
def get_axis_par(self, adr):
"""get arbitrary motor parameter"""
return self.comm(GET_AXIS_PAR, adr)
@Command((IntRange(), IntRange()), result=IntRange())
def set_axis_par(self, adr, value):
"""set arbitrary motor parameter"""
return self.comm(SET_AXIS_PAR, adr, value)