# -*- coding: utf-8 -*- # ***************************************************************************** # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # # ***************************************************************************** """drivers for trinamic PD-1161 motors""" import time import os import struct import json from secop.core import BoolType, Command, EnumType, FloatRange, IntRange, \ HasIodev, Parameter, Property, Drivable, TupleOf, Done from secop.bytesio import BytesIO from secop.lib import getGeneralConfig from secop.errors import CommunicationFailedError, HardwareError MOTOR_STOP = 3 MOVE = 4 SET_AXIS_PAR = 5 GET_AXIS_PAR = 6 SAVE_AXIS_PAR = 7 LOAD_AXIS_PAR = 8 SET_GLOB_PAR = 9 GET_GLOB_PAR = 10 SAVE_GLOB_PAR = 11 LOAD_GLOB_PAR = 12 SET_IO = 14 GET_IO = 15 MOT_FACT = object() CONFIG = getGeneralConfig() class AxisPar(Parameter): SET = SET_AXIS_PAR GET = GET_AXIS_PAR def __init__(self, adr, min=None, max=None, unit=None, scale=1, readonly=False, poll=True): if unit: datatype = FloatRange(min, max, unit=unit) else: datatype = IntRange(min, max) super().__init__('_', datatype, readonly=readonly, poll=poll) self.adr = adr self.scale = scale def to_raw(self, motor, value): return round(value / self.scale) def from_raw(self, motor, value): return value * self.scale def read(self, motor): result = round(self.from_raw(motor, motor.comm(self.GET, self.adr)), 3) # print('read', self.adr, result) return result def write(self, motor, value): rvalue = self.to_raw(motor, value) motor.comm(self.SET, self.adr, rvalue) reply = motor.comm(self.GET, self.adr) if rvalue != reply: raise HardwareError('%s: reply does not match: %r != %r' % (self.name, rvalue, reply)) return round(self.from_raw(motor,reply), 3) def __set_name__(self, owner, name): super().__set_name__(owner, name) setattr(owner, 'read_' + name, lambda self, pobj=self: pobj.read(self)) if not self.readonly: setattr(owner, 'write_' + name, lambda self, value, pobj=self: pobj.write(self, value)) class IoPar(AxisPar): SET = SET_IO GET = GET_IO def __init__(self, adr, bank, datatype=EnumType(CLOSED=0, OPEN=1), scale=1, readonly=False, poll=True): Parameter.__init__(self, '_', datatype, readonly=readonly, poll=poll) self.adr = adr + 1000 * bank self.scale = scale def read(self, motor): return motor.comm(self.GET, self.adr) def write(self, motor, value): motor.comm(self.SET, self.adr, int(value)) return value class AxisDiffPar(AxisPar): @staticmethod def to_raw(motor, value): return round(value / motor.fact) @staticmethod def from_raw(motor, value): return round(value * motor.fact, 3) class AxisZeroPar(AxisPar): @staticmethod def to_raw(motor, value): return round(value / motor.fact - motor.zero) @staticmethod def from_raw(motor, value): return round(value * motor.fact + motor.zero, 3) BAUDRATES = [9600, 0, 19200, 0, 38400, 57600, 0, 115200] class Motor(HasIodev, Drivable): address = Property('module address', IntRange(0, 255), default=1) fact = Property('gear factor', FloatRange(unit='deg/step'), default=1.8/256) limit_pin_mask = Property('input pin mask for lower/upper limit switch', TupleOf(IntRange(0, 15), IntRange(0, 15)), default=(8, 0)) value = Parameter('motor position', FloatRange(unit='deg')) target = Parameter('_', FloatRange(unit='$'), default=0) movelimit = Parameter('max. angle to drive in one go', FloatRange(unit='$'), default=400, readonly=False) zero = Parameter('zero point', FloatRange(unit='$'), readonly=False, default=0) tolerance = Parameter('positioning tolerance', FloatRange(unit='$'), readonly=False, default=0.9) encoder = Parameter('encoder value', FloatRange(unit='$'), needscfg=False) steppos = Parameter('motor steps position', FloatRange(unit='$'), needscfg=False) # at_upper_limit = Parameter('upper limit switch touched', BoolType(), needscfg=False) # at_lower_limit = Parameter('lower limit switch touched', BoolType(), needscfg=False) pull_up = Parameter('activate pull up resistors', BoolType(), needscfg=False) baudrate = Parameter('baud rate code', EnumType({'%d' % v: i for i, v in enumerate(BAUDRATES)}), readonly=False, default=0, poll=True) temperature = IoPar(9, 1, IntRange(), readonly=True) operating_voltage = IoPar(8, 1, IntRange(), readonly=True) enc_tolerance = AxisDiffPar(212, 0, 360., '$') maxspeed = AxisPar(4, 0, 2047) minspeed = AxisPar(130, 0, 2047) maxcurrent = AxisPar(6, 0, 2.8, 'A', 2.8/250) standby_current = AxisPar(7, 0, 2.8, 'A', 2.8/250) acceleration = AxisPar(5, 0, 2047) internal_target = AxisZeroPar(0, unit='$') target_reached = AxisPar(8, readonly=True) move_status = AxisPar(207, readonly=True) error_bits = AxisPar(208, readonly=True) free_wheeling = AxisPar(204, 0, 60, 'sec', scale=0.001) fast_pollfactor = 0.001 _targettime = None _prevconn = None iodevClass = BytesIO _calcTimeout = True _poserror = None save_filename = None def earlyInit(self): self.writeDict.update(self.loadParams()) def loadParams(self): # the following should be moved to core functionality savedir = os.path.join(CONFIG['logdir'], 'persistent') os.makedirs(savedir, exist_ok=True) self._save_filename = os.path.join(savedir, '%s.%s.json' % (self.DISPATCHER.equipment_id, self.name)) try: with open(self._save_filename, 'r') as f: save_dict = json.load(f) except FileNotFoundError: save_dict = {} writeDict = {} for pname, value in save_dict.items(): pobj = self.parameters[pname] val = pobj.datatype.import_value(value) if pobj.readonly: try: pobj.value = val except Exception as e: self.log.warning('can not restore %r to %r' % (pname, val)) else: writeDict[pname] = val return writeDict def saveParams(self): save_dict = {} for pname, pobj in self.parameters.items(): if pname not in ('target', 'internal_target'): save_dict[pname] = pobj.export_value() tmpfile = self._save_filename + '.tmp' try: with open(tmpfile, 'w') as f: json.dump(save_dict, f, indent=2) f.write('\n') os.rename(tmpfile, self._save_filename) finally: try: os.remove(tmpfile) except FileNotFoundError: pass def startModule(self, started_callback): self._initialize = True def cb(self=self, started_callback=started_callback, encoder=self.encoder, zero=self.zero): encoder += self.zero - zero diff = encoder - self.encoder + 180 if abs(diff % 360 - 180) < self.enc_tolerance: diff = round((diff - 180) / 360) * 360 if diff: self.comm(SET_AXIS_PAR, 209, AxisZeroPar.to_raw(self, encoder + diff)) self._poserror = 'reset' else: self.log.error('encoder: saved value (%g) does not match reading (%g)' % (encoder, self.encoder)) self._poserror = 'need_reset' self.comm(SET_GLOB_PAR, 2255, 1) self._initialize = False self._prevconn = self._iodev._conn started_callback() super().startModule(cb) def comm(self, cmd, adr=0, value=0): if self._calcTimeout: self._calcTimeout = False baudrate = getattr(self._iodev._conn.connection, 'baudrate', None) if baudrate: if baudrate not in BAUDRATES: raise CommunicationFailedError('unsupported baud rate: %d' % baudrate) self._iodev.timeout = 0.03 + 200 / baudrate bank = adr // 1000 iadr = adr % 1000 for itry in range(3): byt = struct.pack('>BBBBi', self.address, cmd, iadr, bank, round(value)) try: reply = self._iodev.communicate(byt + bytes([sum(byt) & 0xff]), 9) except Exception as e: print(repr(e), adr, bank) continue if sum(reply[:-1]) & 0xff == reply[-1]: break else: raise CommunicationFailedError('checksum error') radr, modadr, status, rcmd, result = struct.unpack('>BBBBix', reply) if status != 100: self.log.warning('bad status from cmd %r %s: %d', cmd, adr, status) if radr != 2 or modadr != self.address or cmd != rcmd: raise CommunicationFailedError('bad reply %r to command %s %d' % (reply, cmd, adr)) return result def read_value(self): rawenc = self.comm(GET_AXIS_PAR, 209) rawpos = self.comm(GET_AXIS_PAR, 1) self.encoder = AxisZeroPar.from_raw(self, rawenc) self.steppos = AxisZeroPar.from_raw(self, rawpos) initialized = self._initialize or self.comm(GET_GLOB_PAR, 2255) # bank 2 adr 255 if initialized and self._prevconn == self._iodev._conn: # no power loss or connection interrupt self.saveParams() else: self.log.info('set to previous saved values') for pname, value in self.loadParams().items(): try: getattr(self, 'write_' + pname)(value) except Exception as e: self.log.warning('can not write %r to %r' % (value, pname)) self.comm(SET_GLOB_PAR, 2255, 1) self._prevconn = self._iodev._conn if abs(rawenc - rawpos) > 128: return self.encoder return self.steppos def read_status(self): if not self._targettime: if self._poserror: return self.Status.ERROR, 'encoder does not match internal pos' return self.status # self.get_limits() # if self.at_lower_limit or self.at_upper_limit: # self.stop() # return self.Status.ERROR, 'at limit' target_reached = self.read_target_reached() if target_reached or self.read_move_status(): self._targettime = None if abs(self.target - self.value) < self.tolerance: return self.Status.IDLE, '' self.log.warning('out of tolerance (%d)', self.move_status) return self.Status.WARN, 'out of tolerance (%d)' % self.move_status if time.time() > self._targettime: self.log.warning('move timeout') return self.Status.WARN, 'move timeout' return self.Status.BUSY, 'moving' def write_target(self, target): rawenc = self.comm(GET_AXIS_PAR, 209) rawpos = self.comm(GET_AXIS_PAR, 1) if abs(rawenc - rawpos) > 128: # adjust missing steps missing_steps = (rawenc - rawpos) / 256.0 #if self._poserror == 'reset': print(missing_steps, self._poserror) if abs(missing_steps) < 10 or self._poserror == 'reset': self.log.warning('correct missing steps (%.1f deg)', AxisDiffPar.from_raw(self, missing_steps)) self._poserror = None self.status = self.Status.IDLE, '' else: self._poserror = 'need_reset' raise ValueError('encoder does not match internal pos') rawpos += round(missing_steps) * 256 self.comm(SET_AXIS_PAR, 1, rawpos) if abs(target - self.encoder) > self.movelimit: raise ValueError('can not move more than %s deg' % self.movelimit) rawtarget = round((target - self.zero) / self.fact) delay = (abs(rawtarget - rawpos) / 30.5 / self.maxspeed + self.maxspeed / self.acceleration / 15.25 + 0.5) self._targettime = time.time() + delay self.log.info('move to %.1f', target) self.comm(MOVE, 0, rawtarget) print('write_target') self.target_reached = 0 self.status = self.Status.BUSY, 'changed target' return target # def get_limits(self): # self.input_bits = self.comm(GET_IO, 255) # bits = (~ self.input_bits) & 0xf # self.at_lower_limit = bool(bits & self.limit_pin_mask[0]) # self.at_upper_limit = bool(bits & self.limit_pin_mask[1]) # # def read_at_upper_limit(self): # self.get_limits() # return Done # # def read_at_lower_limit(self): # self.get_limits() # return Done def write_pull_up(self, value): self.comm(SET_IO, 0, int(value)) return value def write_baudrate(self, value): self.comm(SET_GLOB_PAR, 65, int(value)) def read_baudrate(self): reply = self.comm(GET_GLOB_PAR, 65) return reply @Command(FloatRange()) def set_zero(self, value): self.zero += value - self.read_value() @Command() def reset(self): if self._poserror: self._poserror = 'reset' self.write_target(self.encoder) @Command def stop(self): self.comm(MOTOR_STOP) self.status = self.Status.IDLE, 'stopped' self._targettime = None @Command(FloatRange()) def step(self, value): self.comm(MOVE, 1, value / self.fact) @Command(IntRange(), result=IntRange()) def get_axis_par(self, adr): return self.comm(GET_AXIS_PAR, adr) @Command(IntRange()) def set_axis_par(self, adr, value): return self.comm(SET_AXIS_PAR, adr, value)