# -*- coding: utf-8 -*- # ***************************************************************************** # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # # ***************************************************************************** """drivers for trinamic PD-1161 motors""" import time import os import struct from secop.core import BoolType, Command, EnumType, FloatRange, IntRange, \ HasIodev, Parameter, Property, Drivable, TupleOf, Done from secop.bytesio import BytesIO from secop.lib import getGeneralConfig from secop.errors import CommunicationFailedError, HardwareError MOTOR_STOP = 3 MOVE = 4 SET_AXIS_PAR = 5 GET_AXIS_PAR = 6 SAVE_AXIS_PAR = 7 LOAD_AXIS_PAR = 8 SET_GLOB_PAR = 9 GET_GLOB_PAR = 10 SAVE_GLOB_PAR = 11 LOAD_GLOB_PAR = 12 SET_IO = 14 GET_IO = 15 MOT_FACT = object() CONFIG = getGeneralConfig() class AxisPar(Parameter): SET = SET_AXIS_PAR GET = GET_AXIS_PAR def __init__(self, adr, min=None, max=None, unit=None, scale=1, readonly=False, poll=True): if unit: datatype = FloatRange(min, max, unit=unit) else: datatype = IntRange(min, max) super().__init__('_', datatype, readonly=readonly, poll=poll) self.adr = adr self.scale = scale def to_raw(self, motor, value): return round(value / self.scale) def from_raw(self, motor, value): return value * self.scale def read(self, motor): return round(self.from_raw(motor, motor.comm(self.GET, self.adr)), 3) def write(self, motor, value): rvalue = self.to_raw(motor, value) motor.comm(self.SET, self.adr, rvalue) reply = motor.comm(self.GET, self.adr) if rvalue != reply: raise HardwareError('%s: reply does not match: %r != %r' % (self.name, rvalue, reply)) return round(self.from_raw(motor,reply), 3) def __set_name__(self, owner, name): super().__set_name__(owner, name) setattr(owner, 'read_' + name, lambda self, pobj=self: pobj.read(self)) if not self.readonly: setattr(owner, 'write_' + name, lambda self, value, pobj=self: pobj.write(self, value)) class IoPar(AxisPar): SET = SET_IO GET = GET_IO def __init__(self, adr, bank, datatype=EnumType(CLOSED=0, OPEN=1), scale=1, readonly=False, poll=True): Parameter.__init__(self, '_', datatype, readonly=readonly, poll=poll) self.adr = adr + 1000 * bank self.scale = scale def read(self, motor): return motor.comm(self.GET, self.adr) def write(self, motor, value): motor.comm(self.SET, self.adr, int(value)) return value class AxisDiffPar(AxisPar): @staticmethod def to_raw(motor, value): return round(value / motor.fact) @staticmethod def from_raw(motor, value): return round(value * motor.fact, 3) class AxisZeroPar(AxisPar): @staticmethod def to_raw(motor, value): return round(value / motor.fact - motor.zero) @staticmethod def from_raw(motor, value): return round(value * motor.fact + motor.zero, 3) BAUDRATES = [9600, 0, 19200, 0, 38400, 57600, 0, 115200] class Motor(HasIodev, Drivable): address = Property('module address', IntRange(0, 255), default=1) fact = Property('gear factor', FloatRange(unit='deg/step'), default=1.8/256) limit_pin_mask = Property('input pin mask for lower/upper limit switch', TupleOf(IntRange(0, 15), IntRange(0, 15)), default=(8, 0)) value = Parameter('motor position', FloatRange(unit='deg')) target = Parameter('_', FloatRange(unit='$'), default=0) zero = Parameter('zero point', FloatRange(unit='$'), readonly=False, default=0) tolerance = Parameter('positioning tolerance', FloatRange(unit='$'), readonly=False, default=0.9) encoder = Parameter('encoder value', FloatRange(unit='$'), needscfg=False) steppos = Parameter('motor steps position', FloatRange(unit='$'), needscfg=False) at_upper_limit = Parameter('upper limit switch touched', BoolType(), needscfg=False) at_lower_limit = Parameter('lower limit switch touched', BoolType(), needscfg=False) pull_up = Parameter('activate pull up resistors', BoolType(), needscfg=False) baudrate = Parameter('baud rate code', EnumType({'%d' % v: i for i, v in enumerate(BAUDRATES)}), readonly=False, default=0, poll=True) temperature = IoPar(9, 1, IntRange(), readonly=True) operating_voltage = IoPar(8, 1, IntRange(), readonly=True) enc_tolerance = AxisDiffPar(212, 0, 360., '$') maxspeed = AxisPar(4, 0, 2047) minspeed = AxisPar(130, 0, 2047) maxcurrent = AxisPar(6, 0, 2.8, 'A', 2.8/250) standby_current = AxisPar(7, 0, 2.8, 'A', 2.8/250) acceleration = AxisPar(5, 0, 2047) internal_target = AxisZeroPar(0, unit='$') target_reached = AxisPar(8, readonly=True) move_status = AxisPar(207, readonly=True) error_bits = AxisPar(208, readonly=True) free_wheeling = AxisPar(204, 0, 60, 'sec', scale=0.001) fast_pollfactor = 0.001 _targettime = None _prevconn = None iodevClass = BytesIO _commcnt = 0 _pollcnt = 0 _lastpoll = 0 _calcTimeout = True def earlyInit(self): self.resetValues = dict(self.writeDict) def startModule(self, started_callback): loaded_pos = None try: print(self.save_filename()) with open(self.save_filename()) as f: loaded_pos = float(f.read()) except FileNotFoundError: pass super().startModule(started_callback) encoder = self.read_encoder() if loaded_pos is not None: diff = loaded_pos - encoder + 180 if abs(diff % 360 - 180) < self.enc_tolerance: diff = round(diff / 180) * 180 if diff: self.comm(SET_AXIS_PAR, 209, AxisZeroPar.to_raw(self, encoder + diff)) else: msg = 'loaded pos (%g) does not match encoder reading' % loaded_pos self.status = self.Status.ERROR, msg self.log.error(msg + ' (%g)' % encoder) def comm(self, cmd, adr=0, value=0): if self._calcTimeout: self._calcTimeout = False baudrate = getattr(self._iodev._conn.connection, 'baudrate', None) if baudrate: if baudrate not in BAUDRATES: raise CommunicationFailedError('unsupported baud rate: %d' % baudrate) self._iodev.timeout = 0.03 + 200 / baudrate self._commcnt = self._commcnt + 1 bank = adr // 1000 iadr = adr % 1000 for itry in range(3): byt = struct.pack('>BBBBi', self.address, cmd, iadr, bank, round(value)) try: reply = self._iodev.communicate(byt + bytes([sum(byt) & 0xff]), 9) except Exception as e: print(repr(e), adr, bank) continue if sum(reply[:-1]) & 0xff == reply[-1]: break else: raise CommunicationFailedError('checksum error') radr, modadr, status, rcmd, result = struct.unpack('>BBBBix', reply) if status != 100: self.log.warning('bad status from cmd %r %s: %d', cmd, adr, status) if radr != 2 or modadr != self.address or cmd != rcmd: raise CommunicationFailedError('bad reply %r to command %s %d' % (reply, cmd, adr)) return result def read_value(self): now = time.time() if now > self._lastpoll + 1: print(now - self._lastpoll, self._commcnt, self._pollcnt) self._commcnt = 0 self._pollcnt = 0 self._lastpoll = now self._pollcnt = self._pollcnt + 1 init = self.comm(GET_GLOB_PAR, 2255) # bank 2 adr 255 if not init or self._prevconn != self._iodev._conn: # reset to initial parameters when either motor lost power or connection has changed self.log.info('reset initial values') for pname, value in self.resetValues.items(): getattr(self, 'write_' + pname)(value) self.comm(SET_GLOB_PAR, 2255, 1) self._prevconn = self._iodev._conn rawenc = self.comm(GET_AXIS_PAR, 209) rawpos = self.comm(GET_AXIS_PAR, 1) self.encoder = AxisZeroPar.from_raw(self, rawenc) self.save_pos() self.steppos = AxisZeroPar.from_raw(self, rawpos) if abs(rawenc - rawpos) > 128: return self.encoder return self.steppos def save_filename(self): savedir = os.path.join(CONFIG['logdir'], 'persistent') os.makedirs(savedir, exist_ok=True) return os.path.join(savedir, '%s.%s' % (type(self).__name__, self.name)) def save_pos(self): with open(self.save_filename(), 'w') as f: f.write('%g' % self.value) def read_status(self): if not self._targettime: return self.status self.get_limits() if self.at_lower_limit or self.at_upper_limit: self.stop() return self.Status.ERROR, 'at limit' target_reached = self.read_target_reached() if target_reached or self.read_move_status(): self._targettime = None if abs(self.target - self.value) < self.tolerance: return self.Status.IDLE, '' self.log.warning('out of tolerance (%d)', self.move_status) return self.Status.WARN, 'out of tolerance (%d)' % self.move_status if time.time() > self._targettime: self.log.warning('move timeout') return self.Status.WARN, 'move timeout' return self.Status.BUSY, 'moving' def write_target(self, target): rawenc = self.comm(GET_AXIS_PAR, 209) rawpos = self.comm(GET_AXIS_PAR, 1) if abs(rawenc - rawpos) > 128: # adjust missing steps missing_steps = round((rawenc - rawpos) / 256.0) self.log.warning('correct %s missing steps', missing_steps) rawpos += missing_steps * 256 self.comm(SET_AXIS_PAR, 1, rawpos) rawtarget = round((target - self.zero) / self.fact) delay = (abs(rawtarget - rawpos) / 30.5 / self.maxspeed + self.maxspeed / self.acceleration / 15.25 + 0.5) self._targettime = time.time() + delay self.log.info('move to %.1f', target) self.comm(MOVE, 0, rawtarget) print('write_target') self.target_reached = 0 self.status = self.Status.BUSY, 'changed target' return target def get_limits(self): self.input_bits = self.comm(GET_IO, 255) bits = (~ self.input_bits) & 0xf self.at_lower_limit = bool(bits & self.limit_pin_mask[0]) self.at_upper_limit = bool(bits & self.limit_pin_mask[1]) def read_at_upper_limit(self): self.get_limits() return Done def read_at_lower_limit(self): self.get_limits() return Done def write_pull_up(self, value): self.comm(SET_IO, 0, int(value)) return value def write_baudrate(self, value): self.comm(SET_GLOB_PAR, 65, int(value)) def read_baudrate(self): reply = self.comm(GET_GLOB_PAR, 65) print('BAUD', reply) return reply @Command(FloatRange()) def set_zero(self, value): self.zero += value - self.read_value() @Command def stop(self): self.comm(MOTOR_STOP) self.status = self.Status.IDLE, 'stopped' self._targettime = None @Command(FloatRange()) def step(self, value): self.comm(MOVE, 1, value / self.fact) @Command(IntRange(), result=IntRange()) def get_axis_par(self, adr): return self.comm(GET_AXIS_PAR, adr) @Command(IntRange()) def set_axis_par(self, adr, value): return self.comm(SET_AXIS_PAR, adr, value)