frappy_psi.trinamic: various importments with limit switches

Change-Id: I112f12f1b6485c1af88648e4c1c420e5002b047c
This commit is contained in:
2026-05-04 16:17:46 +02:00
parent b6fb14250c
commit 8cb0844ac8
+157 -81
View File
@@ -26,10 +26,9 @@ import struct
from frappy.core import BoolType, Command, EnumType, FloatRange, IntRange, \
HasIO, Parameter, Property, Drivable, PersistentMixin, PersistentParam, Done, \
IDLE, BUSY, ERROR, Limit, nopoll, ArrayOf
from frappy.properties import HasProperties
IDLE, BUSY, WARN, ERROR, Limit, nopoll, StringType
from frappy.io import BytesIO
from frappy.errors import CommunicationFailedError, HardwareError, RangeError, IsBusyError
from frappy.errors import CommunicationFailedError, HardwareError, RangeError, IsBusyError, ImpossibleError
from frappy.rwhandler import ReadHandler, WriteHandler
from frappy.lib import formatStatusBits, clamp
@@ -70,14 +69,31 @@ HW_ARGS = {
'free_wheeling': (204, 0.01),
'power_down_delay': (214, 0.01),
'stall_thr': (174, 1),
'disable_right': (12, 1),
'disable_left': (13, 1),
'disable_right_switch': (12, 1),
'disable_left_switch': (13, 1),
}
# special handling (adjust zero):
ENCODER_ADR = 209
STEPPOS_ADR = 1
LEFT_MASK = 2
RIGHT_MASK = 4
LIMITS_MASK = 6
LIMITS_TEXT = {0: '',
LEFT_MASK: 'left limit switch hit',
RIGHT_MASK: 'right limit switch hit',
LIMITS_MASK: 'both limit switches hit - not plugged or wired badly'}
LIMITS_PAST = {0: '',
LEFT_MASK: 'left limit switch was hit',
RIGHT_MASK: 'right limit switch was hit',
LIMITS_MASK: 'both limit switches were hit'}
RUN_IDLE = 0
RUN_DRIVE = 1
RUN_STOP = 2
RUN_STOP_EXC = 3
def writable(*args, **kwds):
"""convenience function to create writable hardware parameters"""
@@ -132,21 +148,24 @@ class Motor(PersistentMixin, HasIO, Drivable):
readonly=False, default=0, visibility=3, group='more')
max_retry = Parameter('maximum number of retries', IntRange(0, 99), readonly=False, default=0)
stall_thr = Parameter('stallGuard threshold', IntRange(-64, 63), readonly=False, value=0)
input_bits = Parameter('input bits', IntRange(0, 255), group='more', export=False)
input1 = Parameter('input 1', BoolType(), export=False, group='more')
input2 = Parameter('input 2', BoolType(), export=False, group='more')
input3 = Parameter('input 3', BoolType(), export=False, group='more')
input_bits = Parameter('inputs',
EnumType(home=0, home_left=2, home_right=4, home_left_right=6,
none=8, left=10, right=12, left_right=14),
group='more', export=True)
output0 = Parameter('output 0', BoolType(), readonly=False, export=False, group='more', default=0)
output1 = Parameter('output 1', BoolType(), readonly=False, export=False, group='more', default=0)
# home is used in frappy_psi.motorvalve
home = Parameter('state of home switch (input 3)', BoolType(), group='more', export=False)
has_home = Property('enable home and activate pullup resistor', BoolType(), export=False,
default=True)
disable_right = writable('disable right limit switch', BoolType(), group='motorparam')
disable_left = writable('disable left limit switch', BoolType(), group='motorparam')
has_inputs = Property('inputs are polled', BoolType(), export=False,
default=False)
with_pullup = Property('activate pullup', BoolType(), export=False,
default=True)
has_home = Parameter('enable home and activate pullup resistor', BoolType(), export=True,
default=False, group='more')
has_limit_switches = Parameter('enable limit switches and activate pullup resistor',
BoolType(), default=False, group='motorparam')
disable_right_switch = writable('disable right limit switch', BoolType(), group='motorparam')
disable_left_switch = writable('disable left limit switch', BoolType(), group='motorparam')
has_inputs = Parameter('inputs are polled', BoolType(), group='more', default=False)
pullup_inputs = Parameter('activate pullup', BoolType(), group='more', default=False)
interlock_reason = Parameter('external interlock', StringType(),
group='more', readonly=False, default='')
pollinterval = Parameter(group='more')
target_min = Limit()
@@ -154,26 +173,14 @@ class Motor(PersistentMixin, HasIO, Drivable):
ioClass = BytesIO
fast_poll = 0.5
_run_mode = 0 # 0: idle, 1: driving, 2: stopping
_run_mode = RUN_IDLE
_calc_timeout = True
_need_reset = None
_last_change = 0
_loading = False # True when loading parameters
_drv_try = 0
def checkProperties(self):
super().checkProperties()
if self.has_home:
self.parameters['home'].export = '_home'
self.setProperty('has_inputs', True)
if self.has_inputs:
self.parameters['input3'].export = '_input3'
else:
self.setProperty('with_pullup', False)
def writeInitParams(self):
super().writeInitParams()
self.comm(SET_IO, 0, self.with_pullup)
_limit_error_state = 0
_idle_status = IDLE, ''
def comm(self, cmd, adr, value=0, bank=0):
"""set or get a parameter
@@ -216,6 +223,11 @@ class Motor(PersistentMixin, HasIO, Drivable):
raise CommunicationFailedError(f'bad reply {reply!r} to command {cmd} {adr}')
return result
def checkProperties(self):
super().checkProperties()
if self.has_home:
self.changeExport('home', True)
def startModule(self, start_events):
super().startModule(start_events)
@@ -251,7 +263,7 @@ class Motor(PersistentMixin, HasIO, Drivable):
if adjusted_encoder != encoder_from_hw:
self.log.info('take next closest encoder value (%.2f)', adjusted_encoder)
self._need_reset = True
self.status = ERROR, 'saved encoder value does not match reading'
self.status = self._idle_status = ERROR, 'saved encoder value does not match reading'
self._write_axispar(adjusted_encoder - self.zero, ENCODER_ADR, ANGLE_SCALE, readback=False)
def _read_axispar(self, adr, scale=1):
@@ -319,11 +331,11 @@ class Motor(PersistentMixin, HasIO, Drivable):
if self.has_encoder:
self.fix_encoder(encoder)
self._need_reset = True
self.status = ERROR, 'power loss'
self.status = self._idle_status = ERROR, 'power loss'
# or should we just fix instead of error status?
# self._write_axispar(self.steppos - self.zero, readback=False)
self.comm(SET_GLOB_PAR, 255, 1, bank=2) # set initialized flag
self._run_mode = 0
self._run_mode = RUN_IDLE
self.setFastPoll(False)
return encoder if abs(encoder - steppos) > self.tolerance else steppos
@@ -336,37 +348,66 @@ class Motor(PersistentMixin, HasIO, Drivable):
self._need_reset = True
if self.auto_reset:
return IDLE, 'encoder does not match internal pos'
if self.status[0] != ERROR:
if self._idle_status[0] != ERROR:
self.log.error('encoder (%.2f) does not match internal pos (%.2f)', self.encoder, self.steppos)
return ERROR, 'encoder does not match internal pos'
return self.status
if self.has_limit_switches:
limit_state = self.read_input_bits() & LIMITS_MASK
if limit_state:
if self._idle_status[0] != ERROR:
if limit_state != self._limit_error_state:
# no need to remember limits state
self._limit_error_state = 0
if limit_state == LIMITS_MASK: # both switches pressed is an error
return ERROR, LIMITS_TEXT[limit_state]
else: # one switch might be freed -> a warning
return WARN, LIMITS_TEXT[limit_state]
else:
if self._limit_error_state:
self._idle_status = None
return WARN, LIMITS_PAST[self._limit_error_state]
return self._idle_status
if self.interlock_reason:
return ERROR, self.interlock_reason
now = time.time()
if self.steppos != oldpos:
if self.steppos != oldpos or now < self._last_change + 0.25 + self.fast_poll:
self._last_change = now
return BUSY, 'stopping' if self._run_mode == 2 else 'moving'
if now < self._last_change + 0.25 + self.fast_poll:
return BUSY, 'stopping' if self._run_mode == 2 else 'moving'
if self.status[0] == BUSY:
return self.status
return BUSY, 'moving'
if self.has_limit_switches:
bits = self.read_input_bits()
if self._run_mode == RUN_DRIVE:
lim_state = bits & LIMITS_MASK
if lim_state & self._limits_check_mask:
self._need_reset = True
lim_text = LIMITS_TEXT[lim_state]
self._limit_error_state = lim_state
self._idle_status = ERROR, lim_text
self._stop_motor(lim_text)
return self.status
reason = ''
if self.read_move_status():
reason = self.move_status.name
elif self.read_error_bits() & 127:
reason = ',' .join(formatStatusBits(self.error_bits & 127, (
'stallGuard', 'over_temp', 'over_temp_warn', 'short_A', 'short_B',
'open_load_A', 'open_load_B', 'standstill')))
elif self.target_reached:
reason = ''
else:
elif not self.target_reached:
reason = 'unknown'
self.setFastPoll(False)
if self._run_mode == 2:
self.target = self.value # indicate to customers that this was stopped
self._run_mode = 0
return IDLE, 'stopped'
self._run_mode = 0
if self._run_mode in (RUN_STOP, RUN_STOP_EXC):
if self._run_mode == RUN_STOP:
self.target = self.value # indicate to customers that this was stopped
self._run_mode = RUN_IDLE
return self._idle_status
self._run_mode = RUN_IDLE
diff = self.target - self.encoder
if abs(diff) <= self.tolerance:
if reason:
self.log.warning('target reached, but move_status = %s', reason)
return IDLE, ''
self._idle_status = IDLE, ''
return self._idle_status
if self._drv_try < self.max_retry:
self._drv_try += 1
self.log.info('try again %.1f %s', diff, reason)
@@ -374,7 +415,10 @@ class Motor(PersistentMixin, HasIO, Drivable):
return BUSY, f'try again'
if self.auto_reset:
self._need_reset = True
return IDLE, f'stalled: {reason}'
self._idle_status = IDLE, f'stalled: {reason}'
return self._idle_status
if self._limit_error_state:
return self._idle_status
self.log.error('out of tolerance by %.3g (%s)', diff, reason)
return ERROR, f'out of tolerance ({reason})'
@@ -403,21 +447,39 @@ class Motor(PersistentMixin, HasIO, Drivable):
if abs(diff) > self.tolerance:
if abs(diff) > self.encoder_tolerance and self.has_encoder:
self._need_reset = True
self.status = ERROR, 'encoder does not match internal pos'
self._idle_status = ERROR, 'encoder does not match internal pos'
raise HardwareError('need reset (encoder does not match internal pos)')
self._write_axispar(self.encoder - self.zero, STEPPOS_ADR, ANGLE_SCALE, readback=False)
self._last_change = time.time()
self._run_mode = 1 # driving
self._run_mode = RUN_DRIVE
self.setFastPoll(True, self.fast_poll)
self.log.debug('move to %.1f', target)
self.comm(MOVE, 0, (target - self.zero) / ANGLE_SCALE)
def write_target(self, target):
self._limit_error_state = 0
self._limits_check_mask = LIMITS_MASK
if self.interlock_reason:
raise ImpossibleError(self.interlock_reason)
limit_state = self.read_input_bits() & LIMITS_MASK
if limit_state:
diff = target - self.steppos
if limit_state == LIMITS_MASK: # limit sw. not connected
raise ImpossibleError(LIMITS_TEXT[limit_state])
if abs(diff) > 5:
raise ImpossibleError('do not drive by more than 5 degrees for freeing a limit switch')
if diff < 0:
if limit_state & LEFT_MASK:
raise ImpossibleError(LIMITS_TEXT[limit_state])
self._limits_check_mask = RIGHT_MASK
else:
if limit_state & RIGHT_MASK:
raise ImpossibleError(LIMITS_TEXT[limit_state])
self._limits_check_mask = LEFT_MASK
self._drv_try = 0
self._stable_since = 0
self.start_motor(target)
self.status = BUSY, 'changed target'
return target
def write_zero(self, value):
self.zero = value
@@ -432,6 +494,20 @@ class Motor(PersistentMixin, HasIO, Drivable):
def read_steppos(self):
return self._read_axispar(STEPPOS_ADR, ANGLE_SCALE) + self.zero
def write_pullup_inputs(self, value):
return self.comm(SET_IO, 0, value)
def write_has_home(self, value):
self.has_inputs = True
if value:
self.write_pullup_inputs(True)
def write_has_limit_switches(self, value):
self.has_inputs = True
self.write_pullup_inputs(True)
self.write_disable_left_switch(False)
self.write_disable_right_switch(False)
@Command(FloatRange())
def set_zero(self, value):
"""adapt zero to make current position equal to given value"""
@@ -455,23 +531,30 @@ class Motor(PersistentMixin, HasIO, Drivable):
diff = self.read_encoder() - self.read_steppos()
if abs(diff) <= tol:
self._need_reset = False
self.status = IDLE, 'ok'
self._limit_error_state = 0
self.status = self._idle_status = IDLE, 'ok'
return
self._write_axispar(self.encoder - self.zero, STEPPOS_ADR, ANGLE_SCALE, readback=False)
self.comm(MOVE, 0, (self.encoder - self.zero) / ANGLE_SCALE)
time.sleep(0.1)
if itry > 5:
tol = self.tolerance
self.status = ERROR, 'reset failed'
return
self.status = self._idle_status = ERROR, 'reset failed'
def _stop_motor(self, reason=None):
self._run_mode = RUN_STOP_EXC if reason else RUN_STOP
self.comm(MOTOR_STOP, 0)
self.status = BUSY, (f'stopping ({reason})' if reason else 'stopping')
self.setFastPoll(False)
@Command()
def stop(self):
"""stop motor immediately"""
self._run_mode = 2 # stopping
self.comm(MOTOR_STOP, 0)
self.status = BUSY, 'stopping'
self.setFastPoll(False)
self._stop_motor()
def write_interlock_reason(self, value):
if value and self._run_mode == RUN_DRIVE:
self._stop_motor(value)
@Command(IntRange(), result=IntRange(), export=False)
def get_axis_par(self, adr):
@@ -483,14 +566,22 @@ class Motor(PersistentMixin, HasIO, Drivable):
"""set arbitrary motor parameter"""
return self.comm(SET_AXIS_PAR, adr, value)
def _limit_bits_status(self, bits, direction=None):
limits = bits & LIMITS_MASK
if limits:
if limits == LIMITS_MASK:
return ERROR, 'both limit switches hit - not plugged or wired badly'
elif limits & LEFT_MASK:
return (ERROR if direction is not None and direction < 0 else WARN), 'left limit switch hit'
else:
return (ERROR if direction is not None and direction > 0 else WARN), 'right limit switch hit'
return None
def read_input_bits(self):
if not self.has_inputs:
return 0
bits = self.comm(GET_IO, 255)
self.input1 = bool(bits & 2)
self.input2 = bool(bits & 4)
self.input3 = bool(bits & 8)
self.home = not self.input3
bits = self.comm(GET_IO, 255) & 14
self.home = not (bits & 8)
return bits
@nopoll
@@ -498,21 +589,6 @@ class Motor(PersistentMixin, HasIO, Drivable):
self.read_input_bits()
return self.home
@nopoll
def read_input1(self):
self.read_input_bits()
return self.input1
@nopoll
def read_input2(self):
self.read_input_bits()
return self.input2
@nopoll
def read_input3(self):
self.read_input_bits()
return self.input3
def write_output0(self, value):
return self.comm(SET_IO, 0, value, bank=2)