Files
mxsc/script/RobotTCP.py
2017-02-23 12:01:36 +01:00

216 lines
6.7 KiB
Python

class RobotTCP(TcpDevice):
def __init__(self, name, server, timeout = 500, retries = 1):
TcpDevice.__init__(self, name, server)
self.timeout = timeout
self.retries = retries
self.header = None
self.trailer = "\n"
self.array_separator = '|'
self.cmd_separator = ' '
self.msg_id = 0
self.working_mode = "invalid"
self.status = "invalid"
self.powered = False
def _sendReceive(self, msg_id, msg = ""):
tx = self.header if (self.header != None) else ""
tx = tx + msg_id + " " + msg
if (len(tx)>127):
raise Exception("Exceeded maximum message size")
self.getLogger().finer("TX = '" + str(tx)+ "'")
if (self.trailer != None): tx = tx + self.trailer
rx = self.sendReceive(tx, None, self.trailer , 0, self.timeout, self.retries)
rx=rx[:-1] #Remove 0A
self.getLogger().finer("RX = '" + str(rx) + "'")
if rx[:3] != msg_id:
print rx
raise Exception("Received invalid message id: " + str(rx[:3]) + " - expecting:" + msg_id )
if len(rx)<4:
raise Exception("Invalid message size: " + str(len(rx)) )
if rx[3] == "*":
raise Exception(rx[4:])
return rx[4:]
def call(self, msg):
id = "%03d" % self.msg_id
self.msg_id = (self.msg_id+1)%1000
return self._sendReceive(id, msg)
def execute(self, command, *argv):
msg = str(command)
if len(argv)>10:
raise Exception("Exceeded maximum number of parameters")
for i in range(len(argv)):
msg += (self.cmd_separator if (i==0) else self.array_separator) + str(argv[i])
rx = self.call(msg)
if rx.count(self.array_separator)>0:
return rx.split(self.array_separator)
return rx
def read_event(self):
ev = self._sendReceive("EVT")
if ev.strip() == "": return None
return ev
def evaluate(self, cmd):
ret = self.execute('eval', cmd)
if ret.strip() != "": raise Exception(ret)
def get_var(self, name):
return self.execute('get_var', name)
def get_varb(self, name):
return True if (self.execute('get_bool', name).strip() == '1') else False
def get_str(self, name):
return self.execute('get_str', name)
def get_arr(self, name, size):
return self.execute('get_arr', name, size)
def get_int(self):
return int(self.get_var("n"))
def get_float(self):
return float(self.get_var("n"))
def get_bool(self, name):
return self.get_varb("b")
def get_int_arr(self, size):
# not working. A Jython bug in PyUnicaode?
#return [int(x) for x in self.get_arr("arr", size)]
ret = []
a=self.get_arr("arr", size)
for i in range(size):
ret.append(int(a[i]))
return ret
def get_float_arr(self, size):
#return [float(x) for x in self.get_arr("arr", size)]
ret = []
a=self.get_arr("arr", size)
for i in range(size):
ret.append(int(a[i]))
return ret
def eval_int(self, cmd):
self.evaluate("n=" + cmd)
return self.get_int()
def eval_float(self, cmd):
self.evaluate("n=" + cmd)
return self.get_float()
def eval_bool(self, cmd):
self.evaluate("b=" + cmd)
return self.get_bool(self)
def get_move_id(self):
return self.eval_int("getMoveId()")
def get_joint_forces(self):
robot_tcp.evaluate("getJointForce(arr)")
return robot_tcp.get_float_arr(6)
def get_move_id(self):
return self.eval_int("getMoveId()")
def get_emergency_stop_sts(self):
st = self.eval_int("esStatus()")
if (st== 1): return "active"
if (st== 2): return "activated"
return "off"
def get_safety_fault_signal(self):
fault = eval_bool("safetyFault(s)")
if (fault):
return get_str("")
def is_powered(self):
self.powered = self.eval_bool("isPowered()")
return self.powered
def get_monitor_speed(self):
self.powered = self.eval_int("getMonitorSpeed()")
return self.powered
def get_monitor_speed(self):
self.powered = self.eval_int("getMonitorSpeed()")
return self.powered
def set_monitor_speed(self, speed):
ret = self.eval_int("setMonitorSpeed(" + str(speed) + ")")
if (ret==-1): raise Exception("The robot is not in remote working mode")
if (ret==-2): raise Exception("The monitor speed is under the control of the operator")
if (ret==-3): raise Exception("The specified speed is not supported")
def enable(self):
return self.evaluate("enablePower()")
def disable(self):
return self.evaluate("disablePower()")
def is_calibrated(self):
return self.eval_bool("isCalibrated()")
def read_working_mode(self):
try:
mode = self.eval_int("workingMode(arr)")
status = int(self.get_var("arr[0]"))
if mode==1:
self.working_mode = "manual"
self.status = "hold" if status==6 else "move"
elif mode==2:
self.working_mode = "test"
self.status = "hold" if status==3 else "move"
elif mode==3:
self.working_mode = "local"
self.status = "hold" if status==2 else "move"
elif mode==4:
self.working_mode = "remote"
self.status = "hold" if status==2 else "move"
else:
self.working_mode = "invalid"
self.status = "invalid"
except:
self.working_mode = "invalid"
return self.working_mode
def update_status():
read_working_mode()
is_powered()
get_monitor_speed()
def poll_events():
ev = self.read_event()
if ec is not None:
self.getLogger().info(ev)
on_event(ev)
def doUpdate(self):
try:
self.update_status()
self.poll_events()
self.setState(State.Ready)
except:
self.setState(State.Offline)
def on_event(ev):
pass
def mount(self, puck, sample):
return self.execute('mount', puck, sample)
add_device(RobotTCP("robot_tcp", "129.129.126.100:1000"), force = True)
robot_tcp.setPolling(500)
robot_tcp.set_monitor_speed(20)