Merge branch 'wip' of gitlab.psi.ch:samenv/frappy into wip
This commit is contained in:
@ -25,6 +25,7 @@ import math
|
||||
import re
|
||||
import time
|
||||
|
||||
<<<<<<< HEAD
|
||||
from secop.core import Drivable, HasIodev, \
|
||||
Parameter, Property, Readable, StringIO
|
||||
from secop.datatypes import EnumType, FloatRange, StringType, StructOf
|
||||
@ -97,12 +98,153 @@ class MercuryChannel(HasIodev):
|
||||
|
||||
def read_channel_name(self):
|
||||
return self.query('')
|
||||
=======
|
||||
from secop.core import Drivable, HasIO, Writable, \
|
||||
Parameter, Property, Readable, StringIO, Attached, Done, IDLE, nopoll
|
||||
from secop.datatypes import EnumType, FloatRange, StringType, StructOf, BoolType
|
||||
from secop.errors import HardwareError
|
||||
from secop_psi.convergence import HasConvergence
|
||||
from secop.lib.enum import Enum
|
||||
|
||||
|
||||
VALUE_UNIT = re.compile(r'(.*\d|inf)([A-Za-z/%]*)$')
|
||||
SELF = 0
|
||||
|
||||
|
||||
def as_float(value):
|
||||
if isinstance(value, str):
|
||||
return float(VALUE_UNIT.match(value).group(1))
|
||||
return '%g' % value
|
||||
|
||||
|
||||
def as_string(value):
|
||||
return value
|
||||
|
||||
|
||||
class Mapped:
|
||||
def __init__(self, **kwds):
|
||||
self.mapping = kwds
|
||||
self.mapping.update({v: k for k, v in kwds.items()})
|
||||
|
||||
def __call__(self, value):
|
||||
return self.mapping[value]
|
||||
|
||||
|
||||
off_on = Mapped(OFF=False, ON=True)
|
||||
fast_slow = Mapped(ON=0, OFF=1) # maps OIs slow=ON/fast=OFF to sample_rate.slow=0/sample_rate.fast=1
|
||||
|
||||
|
||||
class IO(StringIO):
|
||||
identification = [('*IDN?', r'IDN:OXFORD INSTRUMENTS:MERCURY*')]
|
||||
|
||||
|
||||
class MercuryChannel(HasIO):
|
||||
slot = Property('''slot uids
|
||||
|
||||
example: DB6.T1,DB1.H1
|
||||
slot ids for sensor (and control output)''',
|
||||
StringType())
|
||||
channel_name = Parameter('mercury nick name', StringType(), default='')
|
||||
channel_type = '' #: channel type(s) for sensor (and control) e.g. TEMP,HTR or PRES,AUX
|
||||
|
||||
def _complete_adr(self, adr):
|
||||
"""complete address from channel_type and slot"""
|
||||
head, sep, tail = adr.partition(':')
|
||||
for i, (channel_type, slot) in enumerate(zip(self.channel_type.split(','), self.slot.split(','))):
|
||||
if head == str(i):
|
||||
return 'DEV:%s:%s%s%s' % (slot, channel_type, sep, tail)
|
||||
if head == channel_type:
|
||||
return 'DEV:%s:%s%s%s' % (slot, head, sep, tail)
|
||||
return adr
|
||||
|
||||
def multiquery(self, adr, names=(), convert=as_float):
|
||||
"""get parameter(s) in mercury syntax
|
||||
|
||||
:param adr: the 'address part' of the SCPI command
|
||||
the DEV:<slot> is added automatically, when adr starts with the channel type
|
||||
in addition, when adr starts with '0:' or '1:', channel type and slot are added
|
||||
:param names: the SCPI names of the parameter(s), for example ['TEMP']
|
||||
:param convert: a converter function (converts replied string to value)
|
||||
:return: the values as tuple
|
||||
|
||||
Example:
|
||||
adr='AUX:SIG'
|
||||
names = ('PERC',)
|
||||
self.channel_type='PRES,AUX' # adr starts with 'AUX'
|
||||
self.slot='DB5.P1,DB3.G1' # -> take second slot
|
||||
-> query command will be READ:DEV:DB3.G1:PRES:SIG:PERC
|
||||
"""
|
||||
adr = self._complete_adr(adr)
|
||||
cmd = 'READ:%s:%s' % (adr, ':'.join(names))
|
||||
reply = self.communicate(cmd)
|
||||
head = 'STAT:%s:' % adr
|
||||
try:
|
||||
assert reply.startswith(head)
|
||||
replyiter = iter(reply[len(head):].split(':'))
|
||||
keys, result = zip(*zip(replyiter, replyiter))
|
||||
assert keys == tuple(names)
|
||||
return tuple(convert(r) for r in result)
|
||||
except (AssertionError, AttributeError, ValueError):
|
||||
raise HardwareError('invalid reply %r to cmd %r' % (reply, cmd)) from None
|
||||
|
||||
def multichange(self, adr, values, convert=as_float):
|
||||
"""set parameter(s) in mercury syntax
|
||||
|
||||
:param adr: as in see multiquery method
|
||||
:param values: [(name1, value1), (name2, value2) ...]
|
||||
:param convert: a converter function (converts given value to string and replied string to value)
|
||||
:return: the values as tuple
|
||||
|
||||
Example:
|
||||
adr='0:LOOP'
|
||||
values = [('P', 5), ('I', 2), ('D', 0)]
|
||||
self.channel_type='TEMP,HTR' # adr starts with 0: take TEMP
|
||||
self.slot='DB6.T1,DB1.H1' # and take first slot
|
||||
-> change command will be SET:DEV:DB6.T1:TEMP:LOOP:P:5:I:2:D:0
|
||||
"""
|
||||
adr = self._complete_adr(adr)
|
||||
params = ['%s:%s' % (k, convert(v)) for k, v in values]
|
||||
cmd = 'SET:%s:%s' % (adr, ':'.join(params))
|
||||
reply = self.communicate(cmd)
|
||||
head = 'STAT:SET:%s:' % adr
|
||||
|
||||
try:
|
||||
assert reply.startswith(head)
|
||||
replyiter = iter(reply[len(head):].split(':'))
|
||||
keys, result, valid = zip(*zip(replyiter, replyiter, replyiter))
|
||||
assert keys == tuple(k for k, _ in values)
|
||||
assert any(v == 'VALID' for v in valid)
|
||||
return tuple(convert(r) for r in result)
|
||||
except (AssertionError, AttributeError, ValueError) as e:
|
||||
raise HardwareError('invalid reply %r to cmd %r' % (reply, cmd)) from e
|
||||
|
||||
def query(self, adr, convert=as_float):
|
||||
"""query a single parameter
|
||||
|
||||
'adr' and 'convert' areg
|
||||
"""
|
||||
adr, _, name = adr.rpartition(':')
|
||||
return self.multiquery(adr, [name], convert)[0]
|
||||
|
||||
def change(self, adr, value, convert=as_float):
|
||||
adr, _, name = adr.rpartition(':')
|
||||
return self.multichange(adr, [(name, value)], convert)[0]
|
||||
|
||||
def read_channel_name(self):
|
||||
if self.channel_name:
|
||||
return Done # channel name will not change
|
||||
return self.query('0:NICK', as_string)
|
||||
>>>>>>> d3379d5... support for OI mercury series
|
||||
|
||||
|
||||
class TemperatureSensor(MercuryChannel, Readable):
|
||||
channel_type = 'TEMP'
|
||||
value = Parameter(unit='K')
|
||||
<<<<<<< HEAD
|
||||
raw = Parameter('raw value', FloatRange())
|
||||
=======
|
||||
raw = Parameter('raw value', FloatRange(unit='Ohm'))
|
||||
>>>>>>> d3379d5... support for OI mercury series
|
||||
|
||||
def read_value(self):
|
||||
return self.query('TEMP:SIG:TEMP')
|
||||
@ -111,6 +253,7 @@ class TemperatureSensor(MercuryChannel, Readable):
|
||||
return self.query('TEMP:SIG:RES')
|
||||
|
||||
|
||||
<<<<<<< HEAD
|
||||
class HasProgressCheck:
|
||||
"""mixin for progress checks
|
||||
|
||||
@ -345,6 +488,252 @@ class TemperatureLoop(Loop, TemperatureSensor, Drivable):
|
||||
self.log.warning('switch to manual heater mode')
|
||||
self.write_mode('manual')
|
||||
return self.query('HTR:SIG:VOLT', math.sqrt(value * self.heater_resistivity))
|
||||
=======
|
||||
class HasInput(MercuryChannel):
|
||||
controlled_by = Parameter('source of target value', EnumType(members={'self': SELF}), default=0)
|
||||
target = Parameter(readonly=False)
|
||||
input_modules = ()
|
||||
|
||||
def add_input(self, modobj):
|
||||
if not self.input_modules:
|
||||
self.input_modules = []
|
||||
self.input_modules.append(modobj)
|
||||
prev_enum = self.parameters['controlled_by'].datatype._enum
|
||||
# add enum member, using autoincrement feature of Enum
|
||||
self.parameters['controlled_by'].datatype = EnumType(Enum(prev_enum, **{modobj.name: None}))
|
||||
|
||||
def write_controlled_by(self, value):
|
||||
if self.controlled_by == value:
|
||||
return Done
|
||||
self.controlled_by = value
|
||||
if value == SELF:
|
||||
self.log.warning('switch to manual mode')
|
||||
for input_module in self.input_modules:
|
||||
if input_module.control_active:
|
||||
input_module.write_control_active(False)
|
||||
return Done
|
||||
|
||||
|
||||
class Loop(HasConvergence, MercuryChannel, Drivable):
|
||||
"""common base class for loops"""
|
||||
control_active = Parameter('control mode', BoolType())
|
||||
output_module = Attached(HasInput, mandatory=False)
|
||||
ctrlpars = Parameter(
|
||||
'pid (proportional band, integral time, differential time',
|
||||
StructOf(p=FloatRange(0, unit='$'), i=FloatRange(0, unit='min'), d=FloatRange(0, unit='min')),
|
||||
readonly=False,
|
||||
)
|
||||
enable_pid_table = Parameter('', BoolType(), readonly=False)
|
||||
|
||||
def initModule(self):
|
||||
super().initModule()
|
||||
if self.output_module:
|
||||
self.output_module.add_input(self)
|
||||
|
||||
def set_output(self, active):
|
||||
if active:
|
||||
if self.output_module and self.output_module.controlled_by != self.name:
|
||||
self.output_module.controlled_by = self.name
|
||||
else:
|
||||
if self.output_module and self.output_module.controlled_by != SELF:
|
||||
self.output_module.write_controlled_by(SELF)
|
||||
status = IDLE, 'control inactive'
|
||||
if self.status != status:
|
||||
self.status = status
|
||||
|
||||
def set_target(self, target):
|
||||
if self.control_active:
|
||||
self.set_output(True)
|
||||
else:
|
||||
self.log.warning('switch loop control on')
|
||||
self.write_control_active(True)
|
||||
self.target = target
|
||||
self.start_state()
|
||||
|
||||
def read_enable_pid_table(self):
|
||||
return self.query('0:LOOP:PIDT', off_on)
|
||||
|
||||
def write_enable_pid_table(self, value):
|
||||
return self.change('0:LOOP:PIDT', value, off_on)
|
||||
|
||||
def read_ctrlpars(self):
|
||||
# read all in one go, in order to reduce comm. traffic
|
||||
pid = self.multiquery('0:LOOP', ('P', 'I', 'D'))
|
||||
return {k: float(v) for k, v in zip('pid', pid)}
|
||||
|
||||
def write_ctrlpars(self, value):
|
||||
pid = self.multichange('0:LOOP', [(k, value[k.lower()]) for k in 'PID'])
|
||||
return {k.lower(): v for k, v in zip('PID', pid)}
|
||||
|
||||
|
||||
class HeaterOutput(HasInput, MercuryChannel, Writable):
|
||||
"""heater output
|
||||
|
||||
Remark:
|
||||
The hardware calculates the power from the voltage and the configured
|
||||
resistivity. As the measured heater current is available, the resistivity
|
||||
will be adjusted automatically, when true_power is True.
|
||||
"""
|
||||
channel_type = 'HTR'
|
||||
value = Parameter('heater output', FloatRange(unit='W'), readonly=False)
|
||||
target = Parameter('heater output', FloatRange(0, 100, unit='$'), readonly=False)
|
||||
resistivity = Parameter('heater resistivity', FloatRange(10, 1000, unit='Ohm'),
|
||||
readonly=False)
|
||||
true_power = Parameter('calculate power from measured current', BoolType(), readonly=False, default=True)
|
||||
limit = Parameter('heater output limit', FloatRange(0, 1000, unit='W'), readonly=False)
|
||||
volt = 0.0 # target voltage
|
||||
_last_target = None
|
||||
_volt_target = None
|
||||
|
||||
def read_limit(self):
|
||||
return self.query('HTR:VLIM') ** 2 / self.resistivity
|
||||
|
||||
def write_limit(self, value):
|
||||
result = self.change('HTR:VLIM', math.sqrt(value * self.resistivity))
|
||||
return result ** 2 / self.resistivity
|
||||
|
||||
def read_resistivity(self):
|
||||
if self.true_power:
|
||||
return self.resistivity
|
||||
return max(10, self.query('HTR:RES'))
|
||||
|
||||
def write_resistivity(self, value):
|
||||
self.resistivity = self.change('HTR:RES', max(10, value))
|
||||
if self._last_target is not None:
|
||||
if not self.true_power:
|
||||
self._volt_target = math.sqrt(self._last_target * self.resistivity)
|
||||
self.change('HTR:SIG:VOLT', self._volt_target)
|
||||
return Done
|
||||
|
||||
def read_status(self):
|
||||
status = IDLE, ('true power' if self.true_power else 'fixed resistivity')
|
||||
if self.status != status:
|
||||
return status
|
||||
return Done
|
||||
|
||||
def read_value(self):
|
||||
if self._last_target is None: # on init
|
||||
self.read_target()
|
||||
if not self.true_power:
|
||||
volt = self.query('HTR:SIG:VOLT')
|
||||
return volt ** 2 / max(10, self.resistivity)
|
||||
volt, current = self.multiquery('HTR:SIG', ('VOLT', 'CURR'))
|
||||
if volt > 0 and current > 0.0001 and self._last_target:
|
||||
res = volt / current
|
||||
tol = res * max(max(0.0003, abs(volt - self._volt_target)) / volt, 0.0001 / current, 0.0001)
|
||||
if abs(res - self.resistivity) > tol + 0.07 and self._last_target:
|
||||
self.write_resistivity(round(res, 1))
|
||||
if self.controlled_by == 0:
|
||||
self._volt_target = math.sqrt(self._last_target * self.resistivity)
|
||||
self.change('HTR:SIG:VOLT', self._volt_target)
|
||||
return volt * current
|
||||
|
||||
def read_target(self):
|
||||
if self.controlled_by != 0 and self.target:
|
||||
return 0
|
||||
if self._last_target is not None:
|
||||
return Done
|
||||
self._volt_target = self.query('HTR:SIG:VOLT')
|
||||
self.resistivity = max(10, self.query('HTR:RES'))
|
||||
self._last_target = self._volt_target ** 2 / max(10, self.resistivity)
|
||||
return self._last_target
|
||||
|
||||
def set_target(self, value):
|
||||
"""set the target without switching to manual
|
||||
|
||||
might be used by a software loop
|
||||
"""
|
||||
self._volt_target = math.sqrt(value * self.resistivity)
|
||||
self.change('HTR:SIG:VOLT', self._volt_target)
|
||||
self._last_target = value
|
||||
return value
|
||||
|
||||
def write_target(self, value):
|
||||
self.write_controlled_by(SELF)
|
||||
return self.set_target(value)
|
||||
|
||||
|
||||
class TemperatureLoop(TemperatureSensor, Loop, Drivable):
|
||||
channel_type = 'TEMP,HTR'
|
||||
output_module = Attached(HeaterOutput, mandatory=False)
|
||||
ramp = Parameter('ramp rate', FloatRange(0, unit='K/min'), readonly=False)
|
||||
enable_ramp = Parameter('enable ramp rate', BoolType(), readonly=False)
|
||||
setpoint = Parameter('working setpoint (differs from target when ramping)', FloatRange(0, unit='$'))
|
||||
auto_flow = Parameter('enable auto flow', BoolType(), readonly=False)
|
||||
_last_setpoint_change = None
|
||||
|
||||
def doPoll(self):
|
||||
super().doPoll()
|
||||
self.read_setpoint()
|
||||
|
||||
def read_control_active(self):
|
||||
active = self.query('TEMP:LOOP:ENAB', off_on)
|
||||
self.set_output(active)
|
||||
return active
|
||||
|
||||
def write_control_active(self, value):
|
||||
self.set_output(value)
|
||||
return self.change('TEMP:LOOP:ENAB', value, off_on)
|
||||
|
||||
@nopoll # polled by read_setpoint
|
||||
def read_target(self):
|
||||
if self.read_enable_ramp():
|
||||
return self.target
|
||||
self.setpoint = self.query('TEMP:LOOP:TSET')
|
||||
return self.setpoint
|
||||
|
||||
def read_setpoint(self):
|
||||
setpoint = self.query('TEMP:LOOP:TSET')
|
||||
if self.enable_ramp:
|
||||
if setpoint == self.setpoint:
|
||||
# update target when working setpoint does no longer change
|
||||
if setpoint != self.target and self._last_setpoint_change is not None:
|
||||
unchanged_since = time.time() - self._last_setpoint_change
|
||||
if unchanged_since > max(12.0, 0.06 / max(1e-4, self.ramp)):
|
||||
self.target = self.setpoint
|
||||
return setpoint
|
||||
self._last_setpoint_change = time.time()
|
||||
else:
|
||||
self.target = setpoint
|
||||
return setpoint
|
||||
|
||||
def write_target(self, value):
|
||||
target = self.change('TEMP:LOOP:TSET', value)
|
||||
if self.enable_ramp:
|
||||
self._last_setpoint_change = None
|
||||
self.set_target(value)
|
||||
else:
|
||||
self.set_target(target)
|
||||
return Done
|
||||
|
||||
def read_enable_ramp(self):
|
||||
return self.query('TEMP:LOOP:RENA', off_on)
|
||||
|
||||
def write_enable_ramp(self, value):
|
||||
return self.change('TEMP:LOOP:RENA', value, off_on)
|
||||
|
||||
def read_auto_flow(self):
|
||||
return self.query('TEMP:LOOP:FAUT', off_on)
|
||||
|
||||
def write_auto_flow(self, value):
|
||||
return self.change('TEMP:LOOP:FAUT', value, off_on)
|
||||
|
||||
def read_ramp(self):
|
||||
result = self.query('TEMP:LOOP:RSET')
|
||||
return min(9e99, result)
|
||||
|
||||
def write_ramp(self, value):
|
||||
# use 0 or a very big value for switching off ramp
|
||||
if not value:
|
||||
self.write_enable_ramp(0)
|
||||
return 0
|
||||
if value >= 9e99:
|
||||
self.change('TEMP:LOOP:RSET', 'inf', as_string)
|
||||
self.write_enable_ramp(0)
|
||||
return 9e99
|
||||
self.write_enable_ramp(1)
|
||||
return self.change('TEMP:LOOP:RSET', max(1e-4, value))
|
||||
>>>>>>> d3379d5... support for OI mercury series
|
||||
|
||||
|
||||
class PressureSensor(MercuryChannel, Readable):
|
||||
@ -355,6 +744,7 @@ class PressureSensor(MercuryChannel, Readable):
|
||||
return self.query('PRES:SIG:PRES')
|
||||
|
||||
|
||||
<<<<<<< HEAD
|
||||
class PressureLoop(Loop, PressureSensor, Drivable):
|
||||
channel_type = 'PRES,AUX'
|
||||
|
||||
@ -381,6 +771,69 @@ class HeLevel(MercuryChannel, Readable):
|
||||
fast_timeout = Parameter('timeout for switching to slow', FloatRange(0, unit='sec'), readonly=False)
|
||||
_min_level = 200
|
||||
_max_level = -100
|
||||
=======
|
||||
class ValvePos(HasInput, MercuryChannel, Drivable):
|
||||
channel_type = 'PRES,AUX'
|
||||
value = Parameter('value pos', FloatRange(unit='%'), readonly=False)
|
||||
target = Parameter('valve pos target', FloatRange(0, 100, unit='$'), readonly=False)
|
||||
|
||||
def doPoll(self):
|
||||
self.read_status()
|
||||
|
||||
def read_value(self):
|
||||
return self.query('AUX:SIG:PERC')
|
||||
|
||||
def read_status(self):
|
||||
self.read_value()
|
||||
if abs(self.value - self.target) < 0.01:
|
||||
return 'IDLE', 'at target'
|
||||
return 'BUSY', 'moving'
|
||||
|
||||
def read_target(self):
|
||||
return self.query('PRES:LOOP:FSET')
|
||||
|
||||
def write_target(self, value):
|
||||
self.write_controlled_by(SELF)
|
||||
return self.change('PRES:LOOP:FSET', value)
|
||||
|
||||
|
||||
class PressureLoop(PressureSensor, Loop, Drivable):
|
||||
channel_type = 'PRES,AUX'
|
||||
output_module = Attached(ValvePos, mandatory=False)
|
||||
|
||||
def read_control_active(self):
|
||||
active = self.query('PRES:LOOP:FAUT', off_on)
|
||||
self.set_output(active)
|
||||
return active
|
||||
|
||||
def write_control_active(self, value):
|
||||
self.set_output(value)
|
||||
return self.change('PRES:LOOP:FAUT', value, off_on)
|
||||
|
||||
def read_target(self):
|
||||
return self.query('PRES:LOOP:PRST')
|
||||
|
||||
def write_target(self, value):
|
||||
target = self.change('PRES:LOOP:PRST', value)
|
||||
self.set_target(target)
|
||||
return Done
|
||||
|
||||
|
||||
class HeLevel(MercuryChannel, Readable):
|
||||
"""He level meter channel
|
||||
|
||||
The Mercury system does not support automatic switching between fast
|
||||
(when filling) and slow (when consuming). We have to handle this by software.
|
||||
"""
|
||||
channel_type = 'LVL'
|
||||
sample_rate = Parameter('_', EnumType(slow=0, fast=1), readonly=False)
|
||||
hysteresis = Parameter('hysteresis for detection of increase', FloatRange(0, 100, unit='%'),
|
||||
default=5, readonly=False)
|
||||
fast_timeout = Parameter('time to switch to slow after last increase', FloatRange(0, unit='sec'),
|
||||
default=300, readonly=False)
|
||||
_min_level = 999
|
||||
_max_level = -999
|
||||
>>>>>>> d3379d5... support for OI mercury series
|
||||
_last_increase = None # None when in slow mode, last increase time in fast mode
|
||||
|
||||
def check_rate(self, sample_rate):
|
||||
@ -392,6 +845,7 @@ class HeLevel(MercuryChannel, Readable):
|
||||
if sample_rate != 0: # fast
|
||||
if not self._last_increase:
|
||||
self._last_increase = time.time()
|
||||
<<<<<<< HEAD
|
||||
self._max_level = -100
|
||||
elif self._last_increase:
|
||||
self._last_increase = None
|
||||
@ -404,6 +858,20 @@ class HeLevel(MercuryChannel, Readable):
|
||||
def write_sample_rate(self, value):
|
||||
self.check_rate(value)
|
||||
return SAMPLE_RATE[self.query('LVL:HEL:PULS:SLOW', SAMPLE_RATE[value])]
|
||||
=======
|
||||
self._max_level = -999
|
||||
elif self._last_increase:
|
||||
self._last_increase = None
|
||||
self._min_level = 999
|
||||
return sample_rate
|
||||
|
||||
def read_sample_rate(self):
|
||||
return self.check_rate(self.query('LVL:HEL:PULS:SLOW', fast_slow))
|
||||
|
||||
def write_sample_rate(self, value):
|
||||
self.check_rate(value)
|
||||
return self.change('LVL:HEL:PULS:SLOW', value, fast_slow)
|
||||
>>>>>>> d3379d5... support for OI mercury series
|
||||
|
||||
def read_value(self):
|
||||
level = self.query('LVL:SIG:HEL:LEV')
|
||||
@ -415,11 +883,19 @@ class HeLevel(MercuryChannel, Readable):
|
||||
self._max_level = level
|
||||
elif now > self._last_increase + self.fast_timeout:
|
||||
# no increase since fast timeout -> slow
|
||||
<<<<<<< HEAD
|
||||
self.write_sample_rate('slow')
|
||||
else:
|
||||
if level > self._min_level + self.hysteresis:
|
||||
# substantial increase -> fast
|
||||
self.write_sample_rate('fast')
|
||||
=======
|
||||
self.write_sample_rate(self.sample_rate.slow)
|
||||
else:
|
||||
if level > self._min_level + self.hysteresis:
|
||||
# substantial increase -> fast
|
||||
self.write_sample_rate(self.sample_rate.fast)
|
||||
>>>>>>> d3379d5... support for OI mercury series
|
||||
else:
|
||||
self._min_level = min(self._min_level, level)
|
||||
return level
|
||||
@ -432,5 +908,9 @@ class N2Level(MercuryChannel, Readable):
|
||||
return self.query('LVL:SIG:NIT:LEV')
|
||||
|
||||
|
||||
<<<<<<< HEAD
|
||||
class MagnetOutput(MercuryChannel, Drivable):
|
||||
pass
|
||||
=======
|
||||
# TODO: magnet power supply
|
||||
>>>>>>> d3379d5... support for OI mercury series
|
||||
|
267
secop_psi/motorvalve.py
Normal file
267
secop_psi/motorvalve.py
Normal file
@ -0,0 +1,267 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# *****************************************************************************
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify it under
|
||||
# the terms of the GNU General Public License as published by the Free Software
|
||||
# Foundation; either version 2 of the License, or (at your option) any later
|
||||
# version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
#
|
||||
# Module authors:
|
||||
# Markus Zolliker <markus.zolliker@psi.ch>
|
||||
#
|
||||
# *****************************************************************************
|
||||
|
||||
"""motor valve using a trinamic PD-1161 motor
|
||||
|
||||
the valve has an end switch connected to the 'home' digital input
|
||||
of the motor controller. Motor settings for the currently used valve:
|
||||
|
||||
[valve_motor]
|
||||
description = valve motor
|
||||
class = secop_psi.trinamic.Motor
|
||||
maxcurrent=0.3 # a value corresponding to the torque needed to firmly close the valve.
|
||||
move_limit=9999 # no move limit needed
|
||||
acceleration=150
|
||||
encoder_tolerance=3.6 # typical value
|
||||
auto_reset=True # motor stalls on closing
|
||||
|
||||
[valve]
|
||||
description = trinamic angular motor valve
|
||||
class = secop_psi.motorvalve.MotorValve
|
||||
motor = valve_motor
|
||||
turns = 9 # number of turns needed to open fully
|
||||
speed = 400 # close to max. speed
|
||||
lowspeed = 50 # speed for final closing / reference run
|
||||
"""
|
||||
|
||||
from secop.core import Drivable, Parameter, EnumType, Attached, FloatRange, \
|
||||
Command, IDLE, BUSY, WARN, ERROR, Done, PersistentParam, PersistentMixin
|
||||
from secop.errors import HardwareError
|
||||
from secop_psi.trinamic import Motor
|
||||
from secop.lib.statemachine import StateMachine, Retry, Stop
|
||||
|
||||
|
||||
class MotorValve(PersistentMixin, Drivable):
|
||||
motor = Attached(Motor)
|
||||
value = Parameter('current state', EnumType(
|
||||
closed=0, opened=1, undefined=-1), default=-1)
|
||||
target = Parameter('target state', EnumType(close=0, open=1))
|
||||
turns = Parameter('number of turns to open', FloatRange(), readonly=False, group='settings')
|
||||
speed = Parameter('speed for far moves', FloatRange(), readonly=False, group='settings')
|
||||
lowspeed = Parameter('speed for finding closed position', FloatRange(), readonly=False, group='settings')
|
||||
closed_pos = PersistentParam('fully closed position', FloatRange(),
|
||||
persistent='auto', export=True, default=-999) # TODO: export=False
|
||||
pollinterval = Parameter(group='settings')
|
||||
|
||||
_state = None
|
||||
|
||||
# remark: the home button must be touched when the motor is at zero
|
||||
|
||||
def earlyInit(self):
|
||||
super().earlyInit()
|
||||
self._state = StateMachine(logger=self.log, count=3, cleanup=self.handle_error)
|
||||
|
||||
def write_target(self, target):
|
||||
if self.status[0] == ERROR:
|
||||
raise HardwareError('%s: need refrun' % self.status[1])
|
||||
self.target = target
|
||||
self._state.start(self.goto_target, count=3)
|
||||
return Done
|
||||
|
||||
def goto_target(self, state):
|
||||
self.value = 'undefined'
|
||||
if self.motor.isBusy():
|
||||
mot_target = 0 if self.target == self.target.close else self.turns * 360
|
||||
if abs(mot_target - self.motor.target) > self.motor.tolerance:
|
||||
self.motor.stop()
|
||||
return self.open_valve if self.target == self.target.open else self.close_valve
|
||||
|
||||
def read_value(self):
|
||||
"""determine value and status"""
|
||||
if self.status[0] == ERROR:
|
||||
return 'undefined'
|
||||
if self.motor.isBusy():
|
||||
return Done
|
||||
motpos = self.motor.read_value()
|
||||
if self.motor.read_home():
|
||||
if motpos > 360:
|
||||
self.status = ERROR, 'home button must be released at this position'
|
||||
elif motpos > 5:
|
||||
if self.status[0] != ERROR:
|
||||
self.status = WARN, 'position undefined'
|
||||
elif motpos < -360:
|
||||
self.status = ERROR, 'motor must not reach -1 turn!'
|
||||
elif abs(motpos - self.closed_pos) < self.motor.tolerance:
|
||||
self.status = IDLE, 'closed'
|
||||
return 'closed'
|
||||
self.status = WARN, 'nearly closed'
|
||||
return 'undefined'
|
||||
if abs(motpos - self.turns * 360) < 5:
|
||||
self.status = IDLE, 'opened'
|
||||
return 'opened'
|
||||
if motpos < 5:
|
||||
self.status = ERROR, 'home button must be engaged at this position'
|
||||
elif self.status[0] != ERROR:
|
||||
self.status = WARN, 'position undefined'
|
||||
return 'undefined'
|
||||
|
||||
def open_valve(self, state):
|
||||
if state.init:
|
||||
self.closed_pos = -999
|
||||
self.value = 'undefined'
|
||||
self.status = BUSY, 'opening'
|
||||
self.motor.write_speed(self.speed)
|
||||
self.motor.write_target(self.turns * 360)
|
||||
if self.motor.isBusy():
|
||||
if self.motor.home and self.motor.value > 360:
|
||||
self.motor.stop()
|
||||
self.status = ERROR, 'opening valve failed (home switch not released)'
|
||||
return None
|
||||
return Retry()
|
||||
motvalue = self.motor.read_value()
|
||||
if abs(motvalue - self.turns * 360) < 5:
|
||||
self.read_value() # value = opened, status = IDLE
|
||||
else:
|
||||
if state.count > 0:
|
||||
state.count -= 1
|
||||
self.log.info('target %g not reached, try again', motvalue)
|
||||
return self.goto_target
|
||||
self.status = ERROR, 'opening valve failed (motor target not reached)'
|
||||
return None
|
||||
|
||||
def close_valve(self, state):
|
||||
if state.init:
|
||||
self.closed_pos = -999
|
||||
self.status = BUSY, 'closing'
|
||||
self.motor.write_speed(self.speed)
|
||||
self.motor.write_target(0)
|
||||
if self.motor.isBusy():
|
||||
if self.motor.home:
|
||||
return self.find_closed
|
||||
return Retry()
|
||||
motvalue = self.motor.read_value()
|
||||
if abs(motvalue) > 5:
|
||||
if state.count > 0:
|
||||
state.count -= 1
|
||||
self.log.info('target %g not reached, try again', motvalue)
|
||||
return self.goto_target
|
||||
self.status = ERROR, 'closing valve failed (zero not reached)'
|
||||
return None
|
||||
if self.read_value() == self.value.undefined:
|
||||
if self.status[0] != ERROR:
|
||||
return self.find_closed
|
||||
return None
|
||||
|
||||
def find_closed(self, state):
|
||||
"""drive with low speed until motor stalls"""
|
||||
if state.init:
|
||||
self.motor.write_speed(self.lowspeed)
|
||||
state.prev = self.motor.value
|
||||
self.motor.write_target(-360)
|
||||
if self.motor.isBusy():
|
||||
if not self.motor.home:
|
||||
self.motor.stop()
|
||||
return None
|
||||
return Retry()
|
||||
motvalue = self.motor.read_value()
|
||||
if motvalue < -360:
|
||||
self.read_value() # status -> error
|
||||
return None
|
||||
if motvalue < state.prev - 5:
|
||||
# moved by more than 5 deg
|
||||
state.prev = self.motor.value
|
||||
self.motor.write_target(-360)
|
||||
return Retry()
|
||||
if motvalue > 5:
|
||||
self.status = ERROR, 'closing valve failed (zero not reached)'
|
||||
return None
|
||||
if motvalue < -355:
|
||||
self.status = ERROR, 'closing valve failed (does not stop)'
|
||||
return None
|
||||
self.closed_pos = motvalue
|
||||
self.read_value() # value = closed, status = IDLE
|
||||
return None
|
||||
|
||||
@Command
|
||||
def ref_run(self):
|
||||
"""start reference run"""
|
||||
self.target = 'close'
|
||||
self._state.start(self.ref_home, count=3)
|
||||
|
||||
@Command
|
||||
def stop(self):
|
||||
self._state.stop()
|
||||
self.motor.stop()
|
||||
|
||||
def ref_home(self, state):
|
||||
if state.init:
|
||||
self.closed_pos = -999
|
||||
self.motor.write_speed(self.lowspeed)
|
||||
if self.motor.read_home():
|
||||
self.status = BUSY, 'refrun: release home'
|
||||
self.motor.write_target(self.motor.read_value() + 360)
|
||||
return self.ref_released
|
||||
self.status = BUSY, 'refrun: find home'
|
||||
self.motor.write_target(self.motor.read_value() - (self.turns + 1) * 360)
|
||||
if not self.motor.isBusy():
|
||||
self.status = ERROR, 'ref run failed, can not find home switch'
|
||||
return None
|
||||
if not self.motor.home:
|
||||
return Retry()
|
||||
self.motor.write_speed(self.lowspeed)
|
||||
state.prev = self.motor.read_value()
|
||||
self.motor.write_target(state.prev - 360)
|
||||
self.status = BUSY, 'refrun: find closed'
|
||||
return self.ref_closed
|
||||
|
||||
def ref_released(self, state):
|
||||
if self.motor.isBusy():
|
||||
if self.motor.home:
|
||||
return Retry()
|
||||
elif self.motor.read_home():
|
||||
if state.count > 0:
|
||||
state.count -= 1
|
||||
self.log.info('home switch not released, try again')
|
||||
self.motor.write_target(self.motor.target)
|
||||
return Retry()
|
||||
self.status = ERROR, 'ref run failed, can not release home switch'
|
||||
return None
|
||||
return self.ref_home
|
||||
|
||||
def ref_closed(self, state):
|
||||
if self.motor.isBusy():
|
||||
if not self.motor.home:
|
||||
self.motor.stop()
|
||||
return None
|
||||
return Retry()
|
||||
self.motor.set_zero(max(-50, (self.motor.read_value() - state.prev) * 0.5))
|
||||
self.read_value() # check home button is valid
|
||||
if abs(self.motor.target - self.motor.value) < 5:
|
||||
self.status = ERROR, 'ref run failed, does not stop'
|
||||
if self.status[0] == ERROR:
|
||||
return None
|
||||
self.log.info('refrun successful')
|
||||
return self.close_valve
|
||||
|
||||
def handle_error(self, state):
|
||||
if state.stopped: # stop or restart case
|
||||
if state.stopped is Stop:
|
||||
self.status = WARN, 'stopped'
|
||||
return None
|
||||
if state.count > 0:
|
||||
state.count -= 1
|
||||
self.log.info('error %r, try again', state.last_error)
|
||||
state.default_cleanup(state) # log error cause
|
||||
state.last_error = None
|
||||
return self.goto_target # try again
|
||||
self.status = ERROR, str(state.last_error)
|
||||
return state.default_cleanup(state)
|
@ -406,7 +406,9 @@ class SeaModule(Module):
|
||||
# with open(join(seaconfdir, json_file + '.json')) as fp:
|
||||
# sea_object, descr = json.load(fp)
|
||||
with open(join(seaconfdir, json_file)) as fp:
|
||||
descr = json.load(fp)[sea_object]
|
||||
content = json.load(fp)
|
||||
# print(json_file, content.keys())
|
||||
descr = content[sea_object]
|
||||
if rel_paths == '*' or not rel_paths:
|
||||
# take all
|
||||
main = descr['params'][0]
|
||||
@ -419,13 +421,14 @@ class SeaModule(Module):
|
||||
# filter by relative paths
|
||||
rel_paths = rel_paths.split()
|
||||
result = []
|
||||
is_running = None
|
||||
for rpath in rel_paths:
|
||||
include = True
|
||||
for paramdesc in descr['params']:
|
||||
path = paramdesc['path']
|
||||
if path.endswith('is_running'):
|
||||
# take this always
|
||||
result.append(paramdesc)
|
||||
is_running = paramdesc
|
||||
continue
|
||||
if paramdesc.get('visibility', 1) > visibility_level:
|
||||
continue
|
||||
@ -437,6 +440,8 @@ class SeaModule(Module):
|
||||
result.append(paramdesc)
|
||||
elif sub[0] == rpath:
|
||||
result.append(paramdesc)
|
||||
if is_running: # take this at end
|
||||
result.append(is_running)
|
||||
descr['params'] = result
|
||||
rel0 = '' if rel_paths[0] == '.' else rel_paths[0]
|
||||
if result[0]['path'] == rel0:
|
||||
|
@ -191,7 +191,7 @@ class Sensor(Readable):
|
||||
|
||||
def initModule(self):
|
||||
super().initModule()
|
||||
self._rawsensor.registerCallbacks(self, ['status']) # auto update status
|
||||
self.rawsensor.registerCallbacks(self, ['status']) # auto update status
|
||||
self._calib = CalCurve(self.calib)
|
||||
if self.description == '_':
|
||||
self.description = '%r calibrated with curve %r' % (self.rawsensor, self.calib)
|
||||
@ -220,4 +220,4 @@ class Sensor(Readable):
|
||||
self.status = self.Status.ERROR, self._value_error
|
||||
|
||||
def read_value(self):
|
||||
return self._calib(self._rawsensor.read_value())
|
||||
return self._calib(self.rawsensor.read_value())
|
||||
|
@ -26,10 +26,12 @@ import time
|
||||
import struct
|
||||
|
||||
from secop.core import BoolType, Command, EnumType, FloatRange, IntRange, \
|
||||
HasIO, Parameter, Property, Drivable, PersistentMixin, PersistentParam, Done
|
||||
HasIO, Parameter, Property, Drivable, PersistentMixin, PersistentParam, Done, \
|
||||
IDLE, BUSY, ERROR
|
||||
from secop.io import BytesIO
|
||||
from secop.errors import CommunicationFailedError, HardwareError, BadValueError, IsBusyError
|
||||
from secop.rwhandler import ReadHandler, WriteHandler
|
||||
from secop.lib import formatStatusBits
|
||||
|
||||
MOTOR_STOP = 3
|
||||
MOVE = 4
|
||||
@ -37,6 +39,8 @@ SET_AXIS_PAR = 5
|
||||
GET_AXIS_PAR = 6
|
||||
SET_GLOB_PAR = 9
|
||||
GET_GLOB_PAR = 10
|
||||
SET_IO = 14
|
||||
GET_IO = 15
|
||||
# STORE_GLOB_PAR = 11
|
||||
|
||||
BAUDRATES = [9600, 0, 19200, 0, 38400, 57600, 0, 115200]
|
||||
@ -80,7 +84,8 @@ def writable(*args, **kwds):
|
||||
class Motor(PersistentMixin, HasIO, Drivable):
|
||||
address = Property('module address', IntRange(0, 255), default=1)
|
||||
|
||||
value = Parameter('motor position', FloatRange(unit='deg', fmtstr='%.3f'))
|
||||
value = Parameter('motor position', FloatRange(unit='deg', fmtstr='%.3f'),
|
||||
needscfg=False)
|
||||
zero = PersistentParam('zero point', FloatRange(unit='$'), readonly=False, default=0)
|
||||
encoder = PersistentParam('encoder reading', FloatRange(unit='$', fmtstr='%.1f'),
|
||||
readonly=True, initwrite=False)
|
||||
@ -88,12 +93,17 @@ class Motor(PersistentMixin, HasIO, Drivable):
|
||||
readonly=True, initwrite=False)
|
||||
target = Parameter('', FloatRange(unit='$'), default=0)
|
||||
|
||||
move_limit = Parameter('max. angle to drive in one go', FloatRange(unit='$'),
|
||||
move_limit = Parameter('max. angle to drive in one go when current above safe_current',
|
||||
FloatRange(unit='$'),
|
||||
readonly=False, default=360, group='more')
|
||||
safe_current = Parameter('motor current allowed for big steps', FloatRange(unit='A'),
|
||||
readonly=False, default=0.2, group='more')
|
||||
tolerance = Parameter('positioning tolerance', FloatRange(unit='$'),
|
||||
readonly=False, default=0.9)
|
||||
encoder_tolerance = writable('the allowed deviation between steppos and encoder\n\nmust be > tolerance',
|
||||
FloatRange(0, 360., unit='$', fmtstr='%.3f'), group='more')
|
||||
has_encoder = Parameter('whether encoder is used or not', BoolType(),
|
||||
readonly=False, default=True, group='more')
|
||||
speed = writable('max. speed', FloatRange(0, MAX_SPEED, unit='$/sec', fmtstr='%.1f'), default=40)
|
||||
minspeed = writable('min. speed', FloatRange(0, MAX_SPEED, unit='$/sec', fmtstr='%.1f'),
|
||||
default=SPEED_SCALE, group='motorparam')
|
||||
@ -106,10 +116,16 @@ class Motor(PersistentMixin, HasIO, Drivable):
|
||||
acceleration = writable('', FloatRange(4.6 * ACCEL_SCALE, MAX_ACCEL, unit='deg/s^2', fmtstr='%.1f'),
|
||||
default=150., group='motorparam')
|
||||
target_reached = Parameter('', BoolType(), group='hwstatus')
|
||||
move_status = Parameter('', IntRange(0, 3), group='hwstatus')
|
||||
move_status = Parameter('', EnumType(ok=0, stalled=1, encoder_deviation=2, stalled_and_encoder_deviation=3),
|
||||
group='hwstatus')
|
||||
error_bits = Parameter('', IntRange(0, 255), group='hwstatus')
|
||||
home = Parameter('state of home switch (input 3)', BoolType())
|
||||
has_home = Parameter('enable home and activate pullup resistor', BoolType(),
|
||||
default=True, initwrite=True, group='more')
|
||||
auto_reset = Parameter('automatic reset after failure', BoolType(), readonly=False, default=False)
|
||||
free_wheeling = writable('', FloatRange(0, 60., unit='sec', fmtstr='%.2f'),
|
||||
default=0.1, group='motorparam')
|
||||
|
||||
power_down_delay = writable('', FloatRange(0, 60., unit='sec', fmtstr='%.2f'),
|
||||
default=0.1, group='motorparam')
|
||||
baudrate = Parameter('', EnumType({'%d' % v: i for i, v in enumerate(BAUDRATES)}),
|
||||
@ -117,22 +133,24 @@ class Motor(PersistentMixin, HasIO, Drivable):
|
||||
pollinterval = Parameter(group='more')
|
||||
|
||||
ioClass = BytesIO
|
||||
fast_pollfactor = 0.001 # not used any more, TODO: use a statemachine for running
|
||||
_started = 0
|
||||
_calcTimeout = True
|
||||
fast_poll = 0.05
|
||||
_run_mode = 0 # 0: idle, 1: driving, 2: stopping
|
||||
_calc_timeout = True
|
||||
_need_reset = None
|
||||
_last_change = 0
|
||||
_loading = False # True when loading parameters
|
||||
|
||||
def comm(self, cmd, adr, value=0, bank=0):
|
||||
"""set or get a parameter
|
||||
|
||||
:param adr: parameter number
|
||||
:param cmd: SET command (in the GET case, 1 is added to this)
|
||||
:param cmd: instruction number (SET_* or GET_*)
|
||||
:param bank: the bank
|
||||
:param value: if given, the parameter is written, else it is returned
|
||||
:return: the returned value
|
||||
"""
|
||||
if self._calcTimeout and self.io._conn:
|
||||
self._calcTimeout = False
|
||||
if self._calc_timeout and self.io._conn:
|
||||
self._calc_timeout = False
|
||||
baudrate = getattr(self.io._conn.connection, 'baudrate', None)
|
||||
if baudrate:
|
||||
if baudrate not in BAUDRATES:
|
||||
@ -167,6 +185,8 @@ class Motor(PersistentMixin, HasIO, Drivable):
|
||||
super().startModule(start_events)
|
||||
|
||||
def fix_encoder(self=self):
|
||||
if not self.has_encoder:
|
||||
return
|
||||
try:
|
||||
# get encoder value from motor. at this stage self.encoder contains the persistent value
|
||||
encoder = self._read_axispar(ENCODER_ADR, ANGLE_SCALE) + self.zero
|
||||
@ -174,7 +194,8 @@ class Motor(PersistentMixin, HasIO, Drivable):
|
||||
except Exception as e:
|
||||
self.log.error('fix_encoder failed with %r', e)
|
||||
|
||||
start_events.queue(fix_encoder)
|
||||
if self.has_encoder:
|
||||
start_events.queue(fix_encoder)
|
||||
|
||||
def fix_encoder(self, encoder_from_hw):
|
||||
"""fix encoder value
|
||||
@ -194,7 +215,7 @@ class Motor(PersistentMixin, HasIO, Drivable):
|
||||
if adjusted_encoder != encoder_from_hw:
|
||||
self.log.info('take next closest encoder value (%.2f)' % adjusted_encoder)
|
||||
self._need_reset = True
|
||||
self.status = self.Status.ERROR, 'saved encoder value does not match reading'
|
||||
self.status = ERROR, 'saved encoder value does not match reading'
|
||||
self._write_axispar(adjusted_encoder - self.zero, ENCODER_ADR, ANGLE_SCALE, readback=False)
|
||||
|
||||
def _read_axispar(self, adr, scale=1):
|
||||
@ -235,71 +256,123 @@ class Motor(PersistentMixin, HasIO, Drivable):
|
||||
"""handler write for HwParam"""
|
||||
return self._write_axispar(value, *HW_ARGS[pname])
|
||||
|
||||
def doPoll(self):
|
||||
self.read_status() # read_value is called by read_status
|
||||
|
||||
def read_value(self):
|
||||
encoder = self.read_encoder()
|
||||
steppos = self.read_steppos()
|
||||
encoder = self.read_encoder() if self.has_encoder else steppos
|
||||
if self.has_home:
|
||||
self.read_home()
|
||||
initialized = self.comm(GET_GLOB_PAR, 255, bank=2)
|
||||
if initialized: # no power loss
|
||||
self.saveParameters()
|
||||
else: # just powered up
|
||||
# get persistent values
|
||||
writeDict = self.loadParameters()
|
||||
elif not self._loading: # just powered up
|
||||
try:
|
||||
self._loading = True
|
||||
# get persistent values
|
||||
writeDict = self.loadParameters()
|
||||
finally:
|
||||
self._loading = False
|
||||
self.log.info('set to previous saved values %r', writeDict)
|
||||
# self.encoder now contains the last known (persistent) value
|
||||
if self._need_reset is None:
|
||||
if self.status[0] == self.Status.IDLE:
|
||||
if self.status[0] == IDLE:
|
||||
# server started, power cycled and encoder value matches last one
|
||||
self.reset()
|
||||
else:
|
||||
self.fix_encoder(encoder)
|
||||
if self.has_encoder:
|
||||
self.fix_encoder(encoder)
|
||||
self._need_reset = True
|
||||
self.status = self.Status.ERROR, 'power loss'
|
||||
self.status = ERROR, 'power loss'
|
||||
# or should we just fix instead of error status?
|
||||
# self._write_axispar(self.steppos - self.zero, readback=False)
|
||||
self.comm(SET_GLOB_PAR, 255, 1, bank=2) # set initialized flag
|
||||
self._started = 0
|
||||
self._run_mode = 0
|
||||
self.setFastPoll(False)
|
||||
|
||||
return encoder if abs(encoder - steppos) > self.tolerance else steppos
|
||||
|
||||
def read_status(self):
|
||||
oldpos = self.steppos
|
||||
self.read_value() # make sure encoder and steppos are fresh
|
||||
if not self._started:
|
||||
if abs(self.encoder - self.steppos) > self.encoder_tolerance:
|
||||
if not self._run_mode:
|
||||
if self.has_encoder and abs(self.encoder - self.steppos) > self.encoder_tolerance:
|
||||
self._need_reset = True
|
||||
if self.status[0] != self.Status.ERROR:
|
||||
if self.auto_reset:
|
||||
return IDLE, 'encoder does not match internal pos'
|
||||
if self.status[0] != ERROR:
|
||||
self.log.error('encoder (%.2f) does not match internal pos (%.2f)', self.encoder, self.steppos)
|
||||
return self.Status.ERROR, 'encoder does not match internal pos'
|
||||
return ERROR, 'encoder does not match internal pos'
|
||||
return self.status
|
||||
if oldpos != self.steppos or not (self.read_target_reached() or self.read_move_status()
|
||||
or self.read_error_bits()):
|
||||
return self.Status.BUSY, 'moving'
|
||||
# TODO: handle the different errors from move_status and error_bits
|
||||
now = self.parameters['steppos'].timestamp
|
||||
if self.steppos != oldpos:
|
||||
self._last_change = now
|
||||
return BUSY, 'stopping' if self._run_mode == 2 else 'moving'
|
||||
if now < self._last_change + 0.3 and not (self.read_target_reached() or self.read_move_status()):
|
||||
return BUSY, 'stopping' if self._run_mode == 2 else 'moving'
|
||||
if self.target_reached:
|
||||
reason = ''
|
||||
elif self.move_status:
|
||||
reason = self.move_status.name
|
||||
elif self.error_bits:
|
||||
reason = formatStatusBits(self.error_bits, (
|
||||
'stallGuard', 'over_temp', 'over_temp_warn', 'short_A', 'short_B',
|
||||
'open_load_A', 'open_load_B', 'standstill'))
|
||||
else:
|
||||
reason = 'unknown'
|
||||
self.setFastPoll(False)
|
||||
if self._run_mode == 2:
|
||||
self.target = self.value # indicate to customers that this was stopped
|
||||
self._run_mode = 0
|
||||
return IDLE, 'stopped'
|
||||
self._run_mode = 0
|
||||
diff = self.target - self.encoder
|
||||
if abs(diff) <= self.tolerance:
|
||||
self._started = 0
|
||||
return self.Status.IDLE, ''
|
||||
self.log.error('out of tolerance by %.3g', diff)
|
||||
self._started = 0
|
||||
return self.Status.ERROR, 'out of tolerance'
|
||||
if reason:
|
||||
self.log.warning('target reached, but move_status = %s', reason)
|
||||
return IDLE, ''
|
||||
if self.auto_reset:
|
||||
self._need_reset = True
|
||||
return IDLE, 'stalled: %s' % reason
|
||||
self.log.error('out of tolerance by %.3g (%s)', diff, reason)
|
||||
return ERROR, 'out of tolerance (%s)' % reason
|
||||
|
||||
def write_target(self, target):
|
||||
self.read_value() # make sure encoder and steppos are fresh
|
||||
if abs(target - self.encoder) > self.move_limit:
|
||||
raise BadValueError('can not move more than %s deg' % self.move_limit)
|
||||
diff = self.encoder - self.steppos
|
||||
if self._need_reset:
|
||||
raise HardwareError('need reset (%s)' % self.status[1])
|
||||
for _ in range(2): # for auto reset
|
||||
self.read_value() # make sure encoder and steppos are fresh
|
||||
if self.maxcurrent >= self.safe_current + CURRENT_SCALE and (
|
||||
abs(target - self.encoder) > self.move_limit + self.tolerance):
|
||||
# pylint: disable=bad-string-format-type
|
||||
# pylint wrongly does not recognise encoder as a descriptor
|
||||
raise BadValueError('can not move more than %g deg (%g -> %g)' %
|
||||
(self.move_limit, self.encoder, target))
|
||||
diff = self.encoder - self.steppos
|
||||
if self._need_reset:
|
||||
if self.auto_reset:
|
||||
if self.isBusy():
|
||||
self.stop()
|
||||
while self.isBusy():
|
||||
time.sleep(0.1)
|
||||
self.read_value()
|
||||
self.reset()
|
||||
if self.status[0] == IDLE:
|
||||
continue
|
||||
raise HardwareError('auto reset failed')
|
||||
raise HardwareError('need reset (%s)' % self.status[1])
|
||||
break
|
||||
if abs(diff) > self.tolerance:
|
||||
if abs(diff) > self.encoder_tolerance:
|
||||
if abs(diff) > self.encoder_tolerance and self.has_encoder:
|
||||
self._need_reset = True
|
||||
self.status = self.Status.ERROR, 'encoder does not match internal pos'
|
||||
self.status = ERROR, 'encoder does not match internal pos'
|
||||
raise HardwareError('need reset (encoder does not match internal pos)')
|
||||
self._write_axispar(self.encoder - self.zero, STEPPOS_ADR, ANGLE_SCALE)
|
||||
self._started = time.time()
|
||||
self.log.info('move to %.1f', target)
|
||||
self._write_axispar(self.encoder - self.zero, STEPPOS_ADR, ANGLE_SCALE, readback=False)
|
||||
self._last_change = time.time()
|
||||
self._run_mode = 1 # driving
|
||||
self.setFastPoll(True, self.fast_poll)
|
||||
self.log.debug('move to %.1f', target)
|
||||
self.comm(MOVE, 0, (target - self.zero) / ANGLE_SCALE)
|
||||
self.status = self.Status.BUSY, 'changed target'
|
||||
self.status = BUSY, 'changed target'
|
||||
return target
|
||||
|
||||
def write_zero(self, value):
|
||||
@ -308,71 +381,67 @@ class Motor(PersistentMixin, HasIO, Drivable):
|
||||
return Done
|
||||
|
||||
def read_encoder(self):
|
||||
return self._read_axispar(ENCODER_ADR, ANGLE_SCALE) + self.zero
|
||||
if self.has_encoder:
|
||||
return self._read_axispar(ENCODER_ADR, ANGLE_SCALE) + self.zero
|
||||
return self.read_steppos()
|
||||
|
||||
def read_steppos(self):
|
||||
return self._read_axispar(STEPPOS_ADR, ANGLE_SCALE) + self.zero
|
||||
|
||||
def read_home(self):
|
||||
return not self.comm(GET_IO, 255) & 8
|
||||
|
||||
def write_has_home(self, value):
|
||||
"""activate pullup resistor"""
|
||||
return bool(self.comm(SET_IO, 0, value))
|
||||
|
||||
@Command(FloatRange())
|
||||
def set_zero(self, value):
|
||||
"""adjust zero"""
|
||||
self.write_zero(value - self.read_value())
|
||||
"""adapt zero to make current position equal to given value"""
|
||||
raw = self.read_value() - self.zero
|
||||
self.write_zero(value - raw)
|
||||
|
||||
def read_baudrate(self):
|
||||
return self.comm(GET_GLOB_PAR, 65)
|
||||
|
||||
def write_baudrate(self, value):
|
||||
self.comm(SET_GLOB_PAR, 65, int(value))
|
||||
"""a baudrate change takes effect only after power cycle"""
|
||||
return self.comm(SET_GLOB_PAR, 65, int(value))
|
||||
|
||||
@Command()
|
||||
def reset(self):
|
||||
"""set steppos to encoder value, if not within tolerance"""
|
||||
if self._started:
|
||||
if self._run_mode:
|
||||
raise IsBusyError('can not reset while moving')
|
||||
tol = ENCODER_RESOLUTION * 1.1
|
||||
for itry in range(10):
|
||||
diff = self.read_encoder() - self.read_steppos()
|
||||
if abs(diff) <= tol:
|
||||
self._need_reset = False
|
||||
self.status = self.Status.IDLE, 'ok'
|
||||
self.status = IDLE, 'ok'
|
||||
return
|
||||
self._write_axispar(self.encoder - self.zero, STEPPOS_ADR, ANGLE_SCALE, readback=False)
|
||||
self.comm(MOVE, 0, (self.encoder - self.zero) / ANGLE_SCALE)
|
||||
time.sleep(0.1)
|
||||
if itry > 5:
|
||||
tol = self.tolerance
|
||||
self.status = self.Status.ERROR, 'reset failed'
|
||||
self.status = ERROR, 'reset failed'
|
||||
return
|
||||
|
||||
@Command()
|
||||
def stop(self):
|
||||
"""stop motor immediately"""
|
||||
self._run_mode = 2 # stopping
|
||||
self.comm(MOTOR_STOP, 0)
|
||||
self.status = self.Status.IDLE, 'stopped'
|
||||
self._started = 0
|
||||
self.status = BUSY, 'stopping'
|
||||
self.setFastPoll(False)
|
||||
|
||||
@Command()
|
||||
def step_forward(self):
|
||||
"""move one full step forwards
|
||||
|
||||
for quick tests
|
||||
"""
|
||||
self.comm(MOVE, 1, FULL_STEP / ANGLE_SCALE)
|
||||
|
||||
@Command()
|
||||
def step_back(self):
|
||||
"""move one full step backwards
|
||||
|
||||
for quick tests
|
||||
"""
|
||||
self.comm(MOVE, 1, - FULL_STEP / ANGLE_SCALE)
|
||||
|
||||
@Command(IntRange(), result=IntRange())
|
||||
@Command(IntRange(), result=IntRange(), export=False)
|
||||
def get_axis_par(self, adr):
|
||||
"""get arbitrary motor parameter"""
|
||||
return self.comm(GET_AXIS_PAR, adr)
|
||||
|
||||
@Command((IntRange(), IntRange()), result=IntRange())
|
||||
@Command((IntRange(), IntRange()), result=IntRange(), export=False)
|
||||
def set_axis_par(self, adr, value):
|
||||
"""set arbitrary motor parameter"""
|
||||
return self.comm(SET_AXIS_PAR, adr, value)
|
||||
|
Reference in New Issue
Block a user