This commit is contained in:
gac-S_Changer
2020-09-08 11:53:11 +02:00
parent 226db0ad02
commit c85d4b4aa9
251 changed files with 19362 additions and 0 deletions
+83
View File
@@ -0,0 +1,83 @@
class BarcodeReader(DeviceBase):
def __init__(self, name, microscan, microscan_cmd):
DeviceBase.__init__(self, name)
self.microscan = microscan
self.microscan_cmd = microscan_cmd
def doInitialize(self):
self.disable()
self.readout = None
self.processing = False
self.task_callable=None
def enable(self):
self.microscan_cmd.write("<H>")
self.setState(State.Ready)
def disable(self):
self.microscan_cmd.write("<I>")
self.setState(State.Disabled)
def get(self,timeout=1.0):
self.state.assertReady()
try:
self.setState(State.Busy)
self.microscan.flush()
ret = self.microscan.waitString(int(timeout * 1000))
if ret is not None:
ret = ret.strip()
self.setCache(ret, None)
return ret
except:
self.setCache(None, None)
return None
finally:
if self.state == State.Busy:
self.setState(State.Ready)
def doUpdate(self):
self.get()
def read(self,timeout=1.0):
if self.processing:
raise Exception("Ongoing read operation")
self.processing = True
try:
initial = self.state
if initial == State.Disabled:
self.enable()
try:
return self.get(timeout)
finally:
if initial == State.Disabled:
self.disable()
finally:
self.processing = False
def _read_task(self, timeout):
global readout
self.readout = self.read(timeout)
return self.readout
def start_read(self, timeout=1.0):
self.readout = None
self.task_callable = fork((self._read_task, (timeout,)))
def get_readout(self):
return self.readout
def wait_readout(self):
if self.task_callable is not None:
join(self.task_callable)
self.task_callable = None
return self.readout
add_device(BarcodeReader("barcode_reader", mscan_pin, mscan_pin_cmd), force = True)
add_device(BarcodeReader("barcode_reader_puck", mscan_puck, mscan_puck_cmd), force = True)
+43
View File
@@ -0,0 +1,43 @@
def home_fast_table():
caput ("SAR-EXPMX1:ASYN.AOUT", "enable plc 1")
def get_fx_pos():
return caget("SAR-EXPMX:MOT_FX.RBV", 'f')
def set_fx_pos(pos):
return caput("SAR-EXPMX:MOT_FX.VAL", float(pos))
def get_fy_pos():
return caget("SAR-EXPMX:MOT_FY.RBV", 'f')
def set_fy_pos(pos):
return caput("SAR-EXPMX:MOT_FY.VAL", float(pos))
def get_ry_pos():
return caget("SAR-EXPMX:MOT_ROT_Y.RBV", 'f')
def set_ry_pos(pos):
return caput("SAR-EXPMX:MOT_ROT_Y.VAL", float(pos))
def get_cz_pos():
return caget("SAR-EXPMX:MOT_CZ.RBV", 'f')
def set_cz_pos(pos):
return caput("SAR-EXPMX:MOT_CZ.VAL", float(pos))
def get_cx_pos():
return caget("SAR-EXPMX:MOT_CX.RBV", 'f')
def set_cx_pos(pos):
return caput("SAR-EXPMX:MOT_CX.VAL", float(pos))
def set_mount_position():
set_fx_pos(0.0)
set_fy_pos(0.0)
set_ry_pos(0.0)
set_cz_pos(0.0)
set_cx_pos(0.0)
+175
View File
@@ -0,0 +1,175 @@
import ch.psi.pshell.device.DiscretePositionerBase as DiscretePositionerBase
import requests
import json
class Hexiposi(DiscretePositionerBase):
def __init__(self, name, url):
DiscretePositionerBase.__init__(self, name, ["A","B","C","D","E","F"])
self.PORT_SET=8002
self.PORT_GET=8002
if not url.startswith("http://"):
url = "http://" + url
if not url.endswith(":"):
url = url + ":"
self.url_set = url + str (self.PORT_SET)+ "/TellWeb/"
self.url_get = url + str (self.PORT_GET)+ "/TellWeb/"
self.moved = True
self.homing_state = State.Disabled
self.rback = self.UNKNOWN_POSITION
self.timeout = 3.0
self.offline = False
def doInitialize(self):
super(Hexiposi, self).doInitialize()
self.val = self.doReadReadback()
def get_response(self, response):
if (response.status_code!=200):
raise Exception (response.text)
return json.loads(response.text)
def get_status(self):
try:
self.status = self.get_response(requests.get(url=self.url_get+"get", timeout=self.timeout))
self.estop = self.status["estop"]
self.homed = self.status["homed"]
self.error = self.status["errorCode"]
self.remote = self.status["mode"] == "remote"
self.moving = self.status["errorCode"]
self.pos = self.status["position"]
self.moving = self.status["moving"]
self.offset = self.status["offset"]
self.dpos = self.status["discretePosition"]
if (self.homed==False): rback = self.UNKNOWN_POSITION
elif self.dpos == 1: rback = "B" # "A"
elif self.dpos == 2: rback = "C" # "B"
elif self.dpos == 4: rback = "D" # "C"
elif self.dpos == 8: rback = "E" # "D"
elif self.dpos == 16: rback = "F" # "E"
elif self.dpos == 32: rback = "A" # "F"
else: rback = self.UNKNOWN_POSITION
if (rback == self.UNKNOWN_POSITION) or (rback != self.rback):
self.moved = True
self.rback = rback
self.offline = False
return self.status
except:
self.offline = True
self.updateState()
raise
def set_deadband(self, value = 0.1): #degrees
ret = self.get_response(requests.get(url=self.url_set+"setDeadband?deadband=" + str(value), timeout=self.timeout))
if ret["deadbandOutput"] == value:
return value
raise Excepiton("Error setting deadband: " + str(ret))
def move_pos(self, pos):
return self.get_response(requests.get(url=self.url_set+"set?pos=" + str(pos), timeout=self.timeout))
def move_home(self):
ret = self.get_response(requests.get(url=self.url_set+"set?home=1", timeout=self.timeout))
try:
self.waitState(self.homing_state,1200)
except:
pass
return ret
def stop_move(self):
return self.get_response(requests.get(url=self.url_set+"set?stop=1", timeout=self.timeout))
def set_offset(self, offset):
return self.get_response(requests.get(url=self.url_set+"setOffset?offset="+str(offset), timeout=self.timeout))
def doUpdate(self):
self.get_status()
super(Hexiposi, self).doUpdate()
def doStop(self):
self.stop_move()
def doRead(self):
return str(self.val)
def doReadReadback(self):
self.get_status()
return self.rback
def doWrite(self, val):
#val = ord(val) - ord('A') +1
val = ord(val) - ord('B') +1
if val==0: val=6 #A
if val<1 or val>6:
raise Exception("Invalid value: " + str(val))
moving = val != self.val
self.val = val
self.move_pos(self.val)
#Workaround as state does not changes immediatelly
if moving:
#try:
# self.waitState(State.Busy,1200)
#except:
# print sys.exc_info()[1]
start = time.time()
while self.state != State.Busy:
if time.time() - start > 1.5:
print "Timeout waiting Hexiposi busy"
break
self.update()
def is_in_position(self, pos):
return self.take() == pos
def assert_in_position(self, pos):
if not is_in_position(pos):
raise Exception ("Hexiposi is not in position")
def assert_homed(self):
if self.homed == False:
raise Exception ("Hexiposi is not homed")
def assert_in_known_position(self):
self.get_status()
if self.rback == 'Unknown':
raise Exception("Hexiposi is in an unknown position, please home.")
#def isReady(self):
# self.get_status()
# return self.moving == False
def updateState(self):
if self.isSimulated():
self.setState(State.Ready)
elif self.offline:
self.setState(State.Offline)
elif self.homed == False:
self.setState(self.homing_state)
elif self.moving:
self.setState(State.Busy)
else:
self.setState(State.Ready)
#http://myriox06da:8002/TellWeb/get
dev = Hexiposi("hexiposi", "myriotell6d")
#If no Rotation Lid is mounted set it to simulated
#dev.setSimulated()
add_device(dev, True)
hexiposi.polling=1000
#print dev.url
#print dev.get_status()
class hexiposi_position(ReadonlyRegisterBase):
def doRead(self):
try:
return float(hexiposi.pos)
except:
return float("nan")
add_device(hexiposi_position(), True)
hexiposi_position.polling = 1000
hexiposi.set_deadband(1.0)
+27
View File
@@ -0,0 +1,27 @@
class LaserDistance(ReadonlyRegisterBase):
def __init__(self):
ReadonlyRegisterBase.__init__(self, "laser_distance")
def doRead(self):
ret = ue.readable.read()
ret = None if ret is None else (0.0 if math.isnan(ret) else ret)
return ret
class ListenerAI (DeviceListener):
def onValueChanged(self, device, value, former):
if laser_distance is not None:
value = None if value is None else (0.0 if math.isnan(value) else value)
laser_distance.setCache(value, None)
for l in ue.listeners:
if Nameable.getShortClassName(l.getClass()) == "ListenerAI":
ue.removeListener(l)
listenerAI = ListenerAI()
ue.addListener(listenerAI)
laser_distance=LaserDistance()
add_device(laser_distance, True)
laser_distance.update()
+46
View File
@@ -0,0 +1,46 @@
import ch.psi.pshell.device.DiscretePositionerBase as DiscretePositionerBase
class LedPositioner(DiscretePositionerBase):
def __init__(self):
DiscretePositionerBase.__init__(self, "led_ctrl", ["On", "Off"])
self.setState(State.Ready)
self.val = self.doReadReadback()
def doRead(self):
return self.val
def doReadReadback(self):
return "On" if get_led_state() else "Off"
def doWrite(self, val):
self.val = val
if self.val == "On":
set_led_state(True)
else:
set_led_state(False)
add_device(LedPositioner(), True)
led_ctrl.polling = 1000
import ch.psi.pshell.device.ProcessVariableConfig as ProcessVariableConfig
class LedLevel(ProcessVariableBase):
def __init__(self, name):
ProcessVariableBase.__init__(self, name, ProcessVariableConfig())
def doRead(self):
return get_led_level()
def doWrite(self, val):
return set_led_level(val)
led_level = LedLevel("led_level")
led_level.config.minValue = 0.0
led_level.config.maxValue = 100.0
led_level.config.unit = "%"
add_device(led_level, True)
led_level.polling = 1000
+134
View File
@@ -0,0 +1,134 @@
import traceback
from datetime import datetime
class Detector(ReadonlyRegisterBase):
def __init__(self, name):
ReadonlyRegisterBase.__init__(self, name)
self.reset()
def set_inputs(self, inputs):
self.inputs = inputs
if self.take() != inputs.values():
self.setCache(inputs.values(), None)
if self.getParent()!=None:
self.getParent().updateCache()
if (len(self.take()) == 0):
self.setState(State.Offline)
else:
self.setState(State.Ready)
def set_input(self, index, val):
self.inputs[index] = val
self.set_inputs(self.inputs)
def reset(self):
self.id = None
self.sn = None
self.status = None
self.type = None
self.set_inputs({})
class Esera(TcpDevice):
def __init__(self, name, server, timeout = 1000, retries = 1):
TcpDevice.__init__(self, name, server)
self.setMode(self.Mode.FullDuplex)
self.detectors = []
for i in range(30):
self.detectors.append(Detector("Detector " + str(i+1)))
self.setChildren(self.detectors)
self.completed_initializatiod = False
self.debug = False
def start(self):
self.getLogger().info("Starting controller")
self.write("set,sys,run,1\n")
def stop(self):
self.getLogger().info("Stopping controller")
self.write("set,sys,run,0\n")
def list(self):
self.write("get,owb,listall1\n")
def req_data(self):
self.write("get,sys,data\n")
def doInitialize(self):
super(Esera, self).doInitialize()
self.init_timestamp = time.time()
try:
self.setState(State.Ready) #So can communicate
for det in self.detectors:
det.reset()
self.list()
time.sleep(1.0)
self.check_started()
self.req_data()
except:
print >> sys.stderr, traceback.format_exc()
self.getLogger().warning(traceback.format_exc())
raise
def doUpdate(self):
self.check_started()
self.req_data()
def updateCache(self):
#print "Update"
cache = []
for det in self.detectors:
cache.append(det.take())
self.setCache(cache, None)
def check_started(self):
if not self.completed_initializatiod:
init = True
for det in self.detectors:
if det.id == None:
init = False
break
if init:
self.completed_initializatiod = True
print("Completed initialization")
self.getLogger().info("Completed initialization")
self.start()
def onString(self, msg):
if self.debug:
print datetime.now() , " - " , msg
tokens = msg.split("|")
if len(tokens)>1:
try:
if msg[:3] == "LST":
#LST|1_OWD1|3AF361270000009E|S_0|DS2413|
if tokens[1] > 1:
index = int(tokens[1].split("_")[1][3:]) - 1
if index < len(self.detectors):
det = self.detectors[index]
det.id = tokens[1]
det.sn= tokens[2] if len(tokens)>2 else None
det.status = int(tokens[3][2:]) if len(tokens)>3 else None
det.type = tokens[4] if len(tokens)>4 else None
if det.status!= 0:
det.set_inputs({})
else:
for det in self.detectors:
if det.id is not None and msg.startswith(det.id):
det_id = int(tokens[0][len(det.id)+1:])
det.set_input(det_id, int(tokens[1]))
except:
print >> sys.stderr, traceback.format_exc()
self.getLogger().log(traceback.format_exc())
add_device(Esera("onewire", "129.129.126.83:5000"), force = True)
onewire.setPolling(500)
add_device(onewire.detectors[0], force = True)
add_device(onewire.detectors[1], force = True)
add_device(onewire.detectors[2], force = True)
+46
View File
@@ -0,0 +1,46 @@
class RobotModbus(DeviceBase):
def __init__(self, name):
DeviceBase.__init__(self, name)
robot_req.write(0)
def execute(self, command, *argv):
if robot_req.read() != 0:
raise Exception("Ongoing command")
if robot_ack.read() != 0:
raise Exception("Robot is not ready")
robot_cmd.write(command)
args = [0] * robot_args.size
index = 0
for arg in argv:
args[index] = arg
index = index + 1
if index == robot_args.size:
raise Exception("Invalid number of arguments")
robot_args.write(to_array(args, 'i'))
try:
self.request()
err = robot_ack.take()
if err == 1:
ret = robot_ret.read()
return ret
if err == 2:
raise Exception("Invalid command: " + str(command))
raise Exception("Unknown error: " + str(err))
finally:
self.cancel_request()
def request(self):
robot_req.write(1)
while robot_ack.read() == 0:
time.sleep(0.001)
def cancel_request(self):
robot_req.write(0)
while robot_ack.read() != 0:
time.sleep(0.001)
def mount(self, puck, sample):
return self.execute('1', '1', puck, sample)
add_device(RobotModbus("robot_mb"), force = True)
+62
View File
@@ -0,0 +1,62 @@
import ch.psi.pshell.device.PositionerConfig as PositionerConfig
ROBOT_MOTORS = ["x" , "y", "z", "rx", "ry", "rz"]
class RobotCartesianMotor (PositionerBase):
def __init__(self, robot, index):
PositionerBase.__init__(self, robot.name + "_" + ROBOT_MOTORS[index], PositionerConfig())
self.index = index
self.robot = robot
#ATTENTION: Always initialize cartesian motors before scanning (or call robot.set_motors_enabled(True))
def doInitialize(self):
self.setCache(self.doRead(), None)
def doRead(self):
return float("nan") if self.robot.cartesian_destination is None else float(self.robot.cartesian_destination[self.index])
def doWrite(self, value):
if self.robot.cartesian_destination is not None:
#print "Move " + ROBOT_MOTORS[self.index] + " to " + str(value)
self.robot.cartesian_destination[self.index] = float(value)
self.robot.set_pnt(robot.cartesian_destination , "tcp_p")
self.robot.movel("tcp_p", self.robot.tool , DESC_SCAN)
def doReadReadback(self):
return float("nan") if self.robot.cartesian_pos is None else float(self.robot.cartesian_pos[self.index])
ROBOT_JOINT_MOTORS = ["j1" , "j2", "j3", "j4", "j5", "j6"]
class RobotJointMotor (PositionerBase):
def __init__(self, robot, index):
PositionerBase.__init__(self, robot.name + "_" + ROBOT_JOINT_MOTORS[index], PositionerConfig())
self.index = index
self.robot = robot
def doInitialize(self):
self.setpoint = self.doReadReadback()
self.setCache(self.doRead(), None)
def doRead(self):
return self.setpoint
def doWrite(self, value):
#print "Move " + ROBOT_JOINT_MOTORS[self.index] + " to " + str(value)
self.setpoint = value
joint = self.robot.herej()
joint[self.index] = value
self.robot.set_jnt(joint, name="tcp_j")
self.robot.movej("tcp_j", self.robot.tool , DESC_SCAN)
def doReadReadback(self):
return self.robot.herej()[self.index] if self.robot.joint_pos is None else float(self.robot.joint_pos[self.index])
+358
View File
@@ -0,0 +1,358 @@
TOOL_CALIBRATION = "tCalib"
TOOL_SUNA= "tSuna"
TOOL_DEFAULT= TOOL_SUNA
FRAME_TABLE = "fTable"
DESC_FAST = "mFast"
DESC_SLOW = "mSlow"
DESC_SCAN = "mScan"
DESC_DEFAULT = DESC_FAST
AUX_SEGMENT = "X"
DEFAULT_ROBOT_POLLING = 500
TASK_WAIT_ROBOT_POLLING = 50
run("devices/RobotTCP")
simulation = False
joint_forces = False
class RobotSC(RobotTCP):
def __init__(self, name, server, timeout = 1000, retries = 1):
RobotTCP.__init__(self, name, server, timeout, retries)
self.set_tasks(["getDewar", "putDewar", "putGonio", "getGonio", "recover", "moveDewar", "moveCold", "movePark", "moveGonio","moveHeater", "moveScanner","moveHome", "moveAux"])
self.set_known_points(["pPark", "pGonioA", "pDewar", "pGonioG", "pScan", "pHeater", "pHeat", "pHeatB", "pLaser","pHelium", "pHome", "pCold", "pAux"])
self.setPolling(DEFAULT_ROBOT_POLLING)
self.last_command_timestamp = None
self.last_command_position = None
#self.setSimulated()
def move_dewar(self):
self.start_task('moveDewar')
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_dewar()
self.last_command_position = "dewar"
self.last_command_timestamp = time.time()
def move_cold(self):
self.start_task('moveCold')
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_cold()
self.last_command_position = "cold"
self.last_command_timestamp = time.time()
def move_home(self):
self.start_task('moveHome')
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_home()
self.last_command_position = "home"
self.last_command_timestamp = time.time()
def get_dewar(self, segment, puck, sample):
segment = self.toSegmentNumber(segment)
self.start_task('getDewar',segment, puck, sample, is_room_temp())
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_dewar()
self.last_command_position = "dewar"
self.last_command_timestamp = time.time()
def put_dewar(self, segment, puck, sample):
segment = self.toSegmentNumber(segment)
self.assert_dewar()
self.start_task('putDewar',segment, puck, sample, is_room_temp())
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
#self.assert_dewar()
self.assert_cold()
self.last_command_position = "dewar"
self.last_command_timestamp = time.time()
def put_gonio(self):
assert_detector_safe()
pin_offset = get_pin_offset()
pin_angle_offset = get_pin_angle_offset()
print "Pin offset = " + str(pin_offset)
self.start_task('putGonio', pin_offset)
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_gonio()
self.last_command_position = "gonio"
self.last_command_timestamp = time.time()
def get_gonio(self):
assert_detector_safe()
pin_offset = get_pin_offset()
print "Pin offset = " + str(pin_offset)
self.start_task('getGonio', pin_offset)
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_gonio()
self.last_command_position = "gonio"
self.last_command_timestamp = time.time()
def get_aux(self, sample):
self.assert_aux()
self.start_task('getAuxiliary', sample)
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_aux()
self.last_command_position = "aux"
self.last_command_timestamp = time.time()
def put_aux(self, sample):
self.assert_aux()
self.start_task('putAuxiliary', sample)
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_aux()
self.last_command_position = "aux"
self.last_command_timestamp = time.time()
def move_scanner(self):
self.start_task('moveScanner')
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_scanner()
self.last_command_position = "scanner"
self.last_command_timestamp = time.time()
def move_laser(self):
self.start_task('moveScanner')
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_laser()
self.last_command_position = "scanner"
self.last_command_timestamp = time.time()
#def do_scan(self):
# self.start_task('doScan')
# self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
# self.assert_scan_stop()
def move_gonio(self):
assert_detector_safe()
self.start_task('moveGonio')
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_gonio()
self.last_command_position = "gonio"
self.last_command_timestamp = time.time()
def move_park(self):
self.start_task('movePark')
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_park()
self.last_command_position = "park"
self.last_command_timestamp = time.time()
def move_heater(self, speed=-1, to_bottom=True):
self.start_task('moveHeater', speed, to_bottom)
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
if to_bottom:
self.assert_heater_bottom()
else:
self.assert_heater()
self.last_command_position = "heater"
self.last_command_timestamp = time.time()
def robot_recover(self):
self.start_task('recover')
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_home()
self.last_command_position = "home"
self.last_command_timestamp = time.time()
def move_aux(self):
self.start_task('moveAux')
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_aux()
self.last_command_position = "aux"
self.last_command_timestamp = time.time()
def get_calibration_tool(self):
self.start_task('getCalTool')
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_scanner()
self.last_command_position = "scanner"
self.last_command_timestamp = time.time()
def put_calibration_tool(self):
self.start_task('putCalTool')
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_scanner()
self.command_timestamps["scanner"] = time.time()
def toSegmentNumber(self, segment):
if is_string(segment):
segment = ord(segment.upper()) - ord('A') +1
return segment
def on_event(self,ev):
#print "EVT: " + ev
pass
def on_change_working_mode(self, working_mode):
if get_device("hexiposi") is not None:
hexiposi.moved = True #Force image processing on first sample
def doUpdate(self):
#start = time.time()
RobotTCP.doUpdate(self)
global simulation
if not simulation:
if joint_forces:
if self.state != State.Offline:
self.get_joint_forces()
for dev in [jf1, jf2, jf3, jf4,jf5, jf6, jfc]:
dev.update()
#print time.time() -start
def start_task(self, program, *args, **kwargs):
#TODO: Check safe position
return RobotTCP.start_task(self, program, *args, **kwargs)
def stop_task(self):
RobotTCP.stop_task(self)
#TODO: Restore safe position
def set_remote_mode(self):
robot.set_profile("remote")
def set_local(self):
robot.set_profile("default")
def is_park(self):
return self.is_in_point("pPark")
def is_cold(self):
return self.is_in_point("pCold")
def is_home(self):
return self.is_in_point("pHome")
def is_dewar(self):
return self.is_in_point("pDewar")
def is_heater(self):
return self.is_in_point("pHeat")
def is_heater_home(self):
return self.is_in_point("pHeater")
def is_heater_bottom(self):
return self.is_in_point("pHeatB")
def is_gonio(self):
return self.is_in_point("pGonioA")
def is_helium(self):
return self.is_in_point("pHelium")
def is_scanner(self):
return self.is_in_point("pScan")
def is_aux(self):
return self.is_in_point("pAux")
def is_laser(self):
return self.is_in_point("pLaser")
def is_cleared(self):
#return self.is_home() or self.is_park() or self.is_dewar() or self.is_dewar_home()
return self.get_current_point() is not None
def assert_heater_home(self):
self.assert_in_point("pHeater")
def assert_cold(self):
self.assert_in_point("pCold")
def assert_heater(self):
self.assert_in_point("pHeat")
def assert_heater_bottom(self):
self.assert_in_point("pHeatB")
def assert_park(self):
self.assert_in_point("pPark")
def assert_home(self):
self.assert_in_point("pHome")
def assert_dewar(self):
self.assert_in_point("pDewar")
def assert_gonio(self):
self.assert_in_point("pGonioA")
def assert_helium(self):
self.assert_in_point("pHelium")
def assert_scanner(self):
self.assert_in_point("pScan")
def assert_aux(self):
self.assert_in_point("pAux")
def assert_laser(self):
self.assert_in_point("pLaser")
def assert_cleared(self):
if not self.is_cleared():
raise Exception("Robot not in cleared position")
def wait_ready(self):
robot.waitState(State.Ready, 1000) #robot.state.assertReady()
if simulation:
add_device(RobotSC("robot","localhost:1000"),force = True)
else:
add_device(RobotSC("robot", "TellRobot6D:1000"), force = True)
#add_device(RobotSC("robot", "129.129.110.110:1000"), force = True)
#robot.latency = 0.005
robot.set_default_desc(DESC_DEFAULT)
robot.default_speed = 20
robot.set_frame(FRAME_DEFAULT)
robot.set_tool(TOOL_DEFAULT)
robot.setPolling(DEFAULT_ROBOT_POLLING)
robot.get_current_point() #TODO: REMOVE WHEN CURRENT POINT REPORTED BY POLLING MESSAGE
class jf1(ReadonlyRegisterBase):
def doRead(self):
return None if robot.joint_forces == None else robot.joint_forces[0]
class jf2(ReadonlyRegisterBase):
def doRead(self):
return None if robot.joint_forces == None else robot.joint_forces[1]
class jf3(ReadonlyRegisterBase):
def doRead(self):
return None if robot.joint_forces == None else robot.joint_forces[2]
class jf4(ReadonlyRegisterBase):
def doRead(self):
return None if robot.joint_forces == None else robot.joint_forces[3]
class jf5(ReadonlyRegisterBase):
def doRead(self):
return None if robot.joint_forces == None else robot.joint_forces[4]
class jf6(ReadonlyRegisterBase):
def doRead(self):
return None if robot.joint_forces == None else robot.joint_forces[5]
class jfc(ReadonlyRegisterBase):
def doRead(self):
if robot.joint_forces == None:
return float('NaN')
if robot.powered == False:
return float('NaN')
return (robot.joint_forces[1]+74)/4 + (robot.joint_forces[2]+30)/4 + (robot.joint_forces[4]-0.8)/0.2
if joint_forces:
add_device(jf1(), force = True)
add_device(jf2(), force = True)
add_device(jf3(), force = True)
add_device(jf4(), force = True)
add_device(jf5(), force = True)
add_device(jf6(), force = True)
add_device(jfc(), force = True)
+919
View File
@@ -0,0 +1,919 @@
import threading
FRAME_DEFAULT = "world"
FLANGE = "flange"
MAX_NUMBER_PARAMETERS = 20
run("devices/RobotMotors")
class RobotTCP(TcpDevice, Stoppable):
def __init__(self, name, server, timeout = 1000, retries = 1):
TcpDevice.__init__(self, name, server)
self.timeout = timeout
self.retries = retries
self.header = None
self.trailer = "\n"
self.array_separator = '|'
self.cmd_separator = ' '
self.msg_id = 0
self.working_mode = "invalid"
self.status = "invalid"
self.powered = None
self.settled = None
self.empty = None
self.working_mode = None
self.status = None
self.lock = threading.Lock()
self.joint_forces = None
self.current_task = None
self.current_task_ret = None
self.high_level_tasks = []
self.known_points = []
self.current_points = []
self.cartesian_destination = None
#self.flange_pos = [None] * 6
self.cartesian_pos = [None] * 6
self.joint_pos = [None] * 6
self.cartesian_motors_enabled = False
self.cartesian_motors = []
self.joint_motors_enabled = False
self.joint_motors = []
self.tool = None
self.default_desc = None
self.tool_open = None
#self.tool_trsf = [0.0] * 6
self.frame = FRAME_DEFAULT
self.polling_interval = 0.01
self.reset = True
self.default_tolerance = 5
self.default_speed = 100
self.latency = 0
self.last_msg_timestamp = 0
self.task_start_retries = 3
self.exception_on_task_start_failure = True #Tasks may start and be finished when checked
def doInitialize(self):
super(RobotTCP, self).doInitialize()
self.reset = True
def set_tool(self, tool):
self.tool = tool
#self.tool_trsf = self.get_tool_trsf()
self.evaluate("tcp_curtool=" + tool)
if self.cartesian_motors_enabled:
self.update()
self.set_motors_enabled(True)
self.is_tool_open()
def get_tool(self):
return self.tool
def set_frame(self, frame):
self.frame = frame
self.evaluate("tcp_curframe=" + frame)
if self.cartesian_motors_enabled:
self.update()
self.set_motors_enabled(True)
self.waitCacheChange(5000)
def get_frame(self):
return self.frame
def set_default_frame(self):
self.set_frame(FRAME_DEFAULT)
def assert_tool(self, tool=None):
if tool is None:
if self.tool is None:
raise Exception("Tool is undefined")
elif self.tool != tool:
raise Exception("Invalid tool: " + self.tool)
def set_default_desc(self,default_desc):
self.default_desc=default_desc
def get_default_desc(self):
return self.default_desc
def set_tasks(self,tasks):
self.high_level_tasks=tasks
def get_tasks(self):
return self.high_level_tasks
def set_known_points(self, points):
self.known_points=points
def get_known_points(self):
return self.known_points
def get_current_points(self, tolerance = None):
ret = self.is_in_points(*self.known_points, tolerance = tolerance)
current_points = []
for i in range(len(ret)):
if ret[i] == True:
current_points.append(self.known_points[i])
return current_points
def get_current_point(self, tolerance = None):
current_points = self.get_current_points(tolerance)
if (current_points is not None) and ( len(current_points) >0):
return current_points[0]
return None
def get_current_points_cached(self):
return self.current_points
def get_current_point_cached(self):
if (self.current_points is not None) and (len (self.current_points) >0):
return self.current_points[0]
return None
def assert_in_known_point(self, tolerance = None):
if self.get_current_point(tolerance) is None:
raise Exception ("Robot not in known point")
def _sendReceive(self, msg_id, msg = "", timeout = None):
if self.latency >0:
timespan = time.time() - self.last_msg_timestamp
if timespan<self.latency:
time.sleep(self.latency-timespan)
tx = self.header if (self.header != None) else ""
tx = tx + msg_id + " " + msg
if (len(tx)>150):
raise Exception("Exceeded maximum message size")
self.getLogger().finer("TX = '" + str(tx)+ "'")
if (self.trailer != None): tx = tx + self.trailer
if self.isSimulated():
return ""
rx = self.sendReceive(tx, msg_id, self.trailer , 0, self.timeout if timeout is None else timeout, self.retries)
self.last_msg_timestamp = time.time()
rx=rx[:-1] #Remove 0A
self.getLogger().finer("RX = '" + str(rx) + "'")
if rx[:3] != msg_id:
if (time.time()-start) >= timeout:
raise Exception("Received invalid message id: " + str(rx[:3]) + " - expecting:" + msg_id )
if len(rx)<4:
raise Exception("Invalid message size: " + str(len(rx)) )
if rx[3] == "*":
raise Exception(rx[4:])
return rx[4:]
def call(self, msg, timeout = None):
self.lock.acquire()
try:
id = "%03d" % self.msg_id
self.msg_id = (self.msg_id+1)%1000
return self._sendReceive(id, msg, timeout)
finally:
self.lock.release()
def execute(self, command, *args, **kwargs):
timeout = None if (kwargs is None) or (not kwargs.has_key("timeout")) else kwargs["timeout"]
msg = str(command)
if len(args)>MAX_NUMBER_PARAMETERS:
raise Exception("Exceeded maximum number of parameters")
for i in range(len(args)):
msg += (self.cmd_separator if (i==0) else self.array_separator) + str(args[i])
rx = self.call(msg, timeout)
if rx.count(self.array_separator)>0:
return rx.split(self.array_separator)
return rx
def evaluate(self, cmd, timeout=None):
ret = self.execute('eval', cmd, timeout=timeout)
if is_string(ret):
if ret.strip() != "": raise Exception(ret)
def get_var(self, name):
return self.execute('get_var', name)
#Makes app crash
#def get_str(self, name='s'):
# return self.execute('get_str', name)
def get_arr(self, name, size):
return self.execute('get_arr', name, size)
def get_bool(self, name = "tcp_b"):
return True if (self.execute('get_bool', name).strip() == '1') else False
def get_int(self, name ="tcp_n"):
return int(self.get_var(name))
def get_float(self, name ="tcp_n"):
return float(self.get_var(name))
def get_int_arr(self, size, name="tcp_a"):
# not working. A Jython bug in PyUnicaode?
#return [int(x) for x in self.get_arr("tcp_a", size)]
ret = []
a=self.get_arr(name, size)
for i in range(size):
ret.append(int(a[i]))
return ret
def get_float_arr(self, size, name="tcp_a"):
#return [float(x) for x in self.get_arr("tcp_a", size)]
a=self.get_arr(name, size)
ret = [];
for i in range(size): ret.append(float(a[i]));
return ret
def get_trsf(self, name="tcp_t"):
a = self.execute('get_trf', name)
ret = []
for i in range(6): ret.append(float(a[i]))
return ret
def set_trsf(self, l, name="tcp_t"):
self.evaluate(name + "={" + str(l[0]) + ","+ str(l[1]) + ","+ str(l[2]) + ","+ str(l[3]) + ","+ str(l[4]) + ","+ str(l[5]) + "}")
def get_jnt(self, name="tcp_j"):
a = self.execute('get_jnt', name)
ret = []
for i in range(6): ret.append(float(a[i]))
return ret
def set_jnt(self, l, name="tcp_j"):
self.evaluate(name + "={" + str(l[0]) + ","+ str(l[1]) + ","+ str(l[2]) + ","+ str(l[3]) + ","+ str(l[4]) + ","+ str(l[5]) + "}")
def get_pnt(self, name="tcp_p"):
#a = self.execute('get_pnt', name)
#ret = []
#for i in range(6): ret.append(float(a[i]))
#return ret
return self.get_trsf(name+".trsf")
#trsf = (x,y,z,rx,ry,rz)
#TODO: config = (shoulder, elbow, wrist)
def set_pnt(self, trsf, name="tcp_p", config=None):
self.set_trsf(trsf, name+".trsf")
def get_tool_trsf(self, name=None):
if name is None:
self.assert_tool()
name = self.tool
return self.get_trsf(name+".trsf")
def set_tool_trsf(self, trsf, name=None):
if name is None:
self.assert_tool()
name = self.tool
self.set_trsf(trsf, name+".trsf")
def eval_int(self, cmd):
if self.isSimulated():
return 0
self.evaluate("tcp_n=" + cmd)
return self.get_int()
def eval_float(self, cmd):
if self.isSimulated():
return 0.0
self.evaluate("tcp_n=" + cmd)
return self.get_float()
def eval_bool(self, cmd):
if self.isSimulated():
return False
self.evaluate("tcp_b=" + cmd)
return self.get_bool()
#def eval_str(self, cmd):
# self.evaluate("s=" + cmd)
# return self.get_str()
def eval_jnt(self, cmd):
self.evaluate("tcp_j=" + cmd)
return self.get_jnt()
def eval_trf(self, cmd):
self.evaluate("tcp_t=" + cmd)
return self.get_trsf()
def eval_pnt(self, cmd):
self.evaluate("tcp_p=" + cmd)
return self.get_pnt()
#Robot control
def is_powered(self):
self.powered = self.eval_bool("isPowered()")
return self.powered
def enable(self):
if not self.is_powered():
self.evaluate("enablePower()")
time.sleep(1.0)
if not self.is_powered():
raise Exception("Cannot enable power")
#waits for power to be actually cut off
def disable(self):
self.evaluate("disablePower()", timeout=5000)
def get_monitor_speed(self):
self.speed = self.eval_int("getMonitorSpeed()")
return self.speed
def set_monitor_speed(self, speed):
ret = self.eval_int("setMonitorSpeed(" + str(speed) + ")")
if (ret==-1): raise Exception("The robot is not in remote working mode")
if (ret==-2): raise Exception("The monitor speed is under the control of the operator")
if (ret==-3): raise Exception("The specified speed is not supported")
def set_default_speed(self):
set_monitor_speed(self.default_speed)
def is_calibrated(self):
return self.eval_bool("isCalibrated()")
def save_program(self):
ret = self.execute('save', timeout=5000)
if str(ret) != "0": raise Exception("Error saving program: " + str(ret))
def _update_working_mode(self, mode, status):
cur_mode = self.working_mode
if mode==1:
self.working_mode = "manual"
self.status = "hold" if status==6 else "move"
elif mode==2:
self.working_mode = "test"
self.status = "hold" if status==3 else "move"
elif mode==3:
self.working_mode = "local"
self.status = "hold" if status==2 else "move"
elif mode==4:
self.working_mode = "remote"
self.status = "hold" if status==2 else "move"
else:
self.working_mode = "invalid"
self.status = "invalid"
if self.working_mode != cur_mode:
try:
self.on_change_working_mode(self.working_mode)
except:
pass
def read_working_mode(self):
try:
mode = self.eval_int("workingMode(tcp_a)")
status = int(self.get_var("tcp_a[0]"))
self._update_working_mode(mode, status)
self._update_state()
except:
self.working_mode = "invalid"
self.status = "invalid"
return self.working_mode
def get_emergency_stop_sts(self):
st = self.eval_int("esStatus()")
if (st== 1): return "active"
if (st== 2): return "activated"
return "off"
def get_safety_fault_signal(self):
fault = self.eval_bool("safetyFault(s)")
#if (fault):
# return get_str()
return fault
#Motion control
def stop(self):
self.evaluate("stopMove()")
def resume(self):
self.evaluate("restartMove()")
def reset_motion(self, joint=None, timeout=None):
#TODO: in new robot robot.resetMotion() is freezing controller
#self.evaluate("resetMotion()" if (joint is None) else ("resetMotion(" + joint + ")"))
if joint is None:
self.execute('reset', timeout=timeout)
else:
self.execute('reset', str(joint), timeout=timeout)
def is_empty(self):
self.empty = self.eval_bool("isEmpty()")
self._update_state()
return self.empty
def is_settled(self):
self.settled = self.eval_bool("isSettled()")
self._update_state()
return self.settled
def get_move_id(self):
return self.eval_int("getMoveId()")
def set_move_id(self, id):
return self.evaluate("setMoveId(" + str(id) + " )")
def get_joint_forces(self):
try:
self.evaluate("getJointForce(tcp_a)")
self.joint_forces = self.get_float_arr(6)
return self.joint_forces
except:
self.joint_forces = None
raise
def movej(self, joint_or_point, tool=None, desc=None, sync=False):
if desc is None: desc = self.default_desc
if tool is None: tool = self.tool
#If joint_or_point is a list assumes ir is a point
if not is_string(joint_or_point):
robot.set_pnt(joint_or_point , "tcp_p")
joint_or_point = "tcp_p"
#TODO: in new robot movel and movej is freezing controller
#ret = self.eval_int("movej(" + joint_or_point + ", " + tool + ", " + desc +")")
ret = int(self.execute('movej',joint_or_point, tool, desc))
if sync:
self.wait_end_of_move()
return ret
def movel(self, point, tool=None, desc=None, sync=False):
if desc is None: desc = self.default_desc
if tool is None: tool = self.tool
if not is_string(point):
robot.set_pnt(point , "tcp_p")
point = "tcp_p"
#TODO: in new robot movel and movej is freezing controller
#ret = self.eval_int("movel(" + point + ", " + tool + ", " + desc +")")
ret = int(self.execute('movel',point, tool, desc))
if sync:
self.wait_end_of_move()
return ret
def movec(self, point_interm, point_target, tool=None, desc=None, sync=False):
if desc is None: desc = self.default_desc
if tool is None: tool = self.tool
#TODO: in new robot movel and movej is freezing controller
#ret = self.eval_int("movec(" + point_interm + ", " + point_target + ", " + tool + ", " + desc +")")
ret = int(self.execute('movec', point_interm, point_target, tool, desc))
if sync:
self.wait_end_of_move()
return ret
def wait_end_of_move(self):
time.sleep(0.05)
self.update()
#time.sleep(0.01)
#self.waitCacheChange(-1)
self.waitReady(-1)
#self.waitState(State.Ready, -1)
#Tool - synchronized as can freeze communication
"""
def open_tool(self, tool=None, timeout=3000):
#This function can timeout since it synchronizes move. Checking state before otherwise it can freeze the communication.
self.waitState(State.Ready, -1)
if tool is None: tool = self.tool
return self.evaluate("open(" + tool + " )", timeout=timeout)
def close_tool(self, tool=None, timeout=3000):
#This function can timeout since it synchronizes move. Checking state before otherwise it can freeze the communication.
self.waitState(State.Ready, -1)
if tool is None: tool = self.tool
return self.evaluate("close(" + tool + " )", timeout=timeout)
"""
#Tool - Not synchronized calls: atention to open/close only when state is Ready
def open_tool(self, tool=None):
if tool is None: tool = self.tool
self.evaluate(tool + ".gripper=true")
self.tool_open = True
def close_tool(self, tool=None):
if tool is None: tool = self.tool
self.evaluate(tool + ".gripper=false")
self.tool_open = False
def is_tool_open(self, tool=None):
if tool is None: tool = self.tool
self.tool_open = robot.eval_bool(tool + ".gripper")
return self.tool_open
#Arm position
def herej(self):
return self.eval_jnt("herej()")
def distance_t(self, trsf1, trsf2):
return self.eval_float("distance(" + trsf1 + ", " + trsf2 + ")")
def distance_p(self, pnt1, pnt2):
return self.eval_float("distance(" + pnt1 + ", " + pnt2 + ")")
def compose(self, pnt, frame = None, trsf = "tcp_t"):
if frame is None: frame = self.frame
return self.eval_pnt("compose(" + pnt + ", " + frame + ", " + trsf + ")")
def here(self, tool=None, frame=None):
if tool is None: tool = self.tool
if frame is None: frame = self.frame
return self.eval_pnt("here(" + tool + ", " + frame + ")")
def joint_to_point(self, tool=None, frame=None, joint="tcp_j"):
if tool is None: tool = self.tool
if frame is None: frame = self.frame
return self.eval_pnt("jointToPoint(" + tool + ", " + frame + ", " + joint +")")
def point_to_joint(self, tool=None, initial_joint="tcp_j", point="tcp_p"):
if tool is None: tool = self.tool
if self.eval_bool("pointToJoint(" + tool + ", " + initial_joint + ", " + point +", j)"):
return self.get_jnt()
def position(self, point, frame=None):
if frame is None: frame = self.frame
return self.eval_trf("position(" + point + ", " + frame + ")")
#Profile
def get_profile(self):
return self.execute('get_profile', timeout=2000)
def set_profile(self, name, pwd = ""):
self.execute('set_profile', str(name), str(pwd), timeout=5000)
#Task control
def task_create(self, program, *args, **kwargs):
program = str(program)
priority = 10 if (kwargs is None) or (not kwargs.has_key("priority")) or (kwargs["priority"] is None) else kwargs["priority"]
name = str(program if (kwargs is None) or (not kwargs.has_key("name")) else kwargs["name"])
if self.get_task_status(name)[0] != -1:
raise Exception("Task already exists: " + name)
if priority<1 or priority > 100:
raise Exception("Invalid priority: " + str(priority))
cmd = program + '('
for i in range(len(args)):
val = args[i]
if type(val) == bool:
if val == True: val = "true"
elif val == False: val = "false"
cmd += str(val) + (',' if (i<(len(args)-1)) else '')
cmd+=')'
#TODO: in new robot exec taskCreate is freezing controller
#REMOVE if bug is fixed
self.execute('task_create',name, str(priority), program, *args)
#self.evaluate('taskCreate "' + name + '", ' + str(priority) + ', ' + cmd)
def task_suspend(self, name):
self.evaluate('taskSuspend("' + str(name)+ '")')
#waits until the task is ready for restart
def task_resume(self, name):
self.evaluate('taskResume("' + str(name)+ '",0)', timeout = 2000)
#waits for the task to be actually killed
def task_kill(self, name):
#self.evaluate('taskKill("' + str(name)+ '")', timeout = 5000)
self.execute('kill', str(name), timeout=5000)
def get_task_status(self, name):
if self.isSimulated():
return (-1,"Stopped")
code = self.eval_int('taskStatus("' + str(name)+ '")')
#TODO: String assignments in $exec causes application to freeze
#status = self
if code== -1: status = "Stopped"
elif code== 0: status = "Paused"
elif code== 1: status = "Running"
#else: status = self.execute('get_help', code)
else: status = "Error"
return (code,status)
def get_tasks_status(self, *pars):
ret = self.execute("task_sts", *pars)
ret = ret[0:len(pars)]
for i in range(len(ret)):
try:
ret[i] = int(ret[i])
except:
ret[i] = None
return ret
def _update_state(self):
#self.setState(State.Busy if self.status=="move" else State.Ready)
if self.state==State.Offline:
print "Communication resumed"
if self.reset or (self.state==State.Offline):
self.check_task()
if self.current_task is not None:
print "Ongoing task: " + self.current_task
if (not self.settled) or (self.current_task is not None): self.setState(State.Busy)
elif not self.empty: self.setState(State.Paused)
else: self.setState(State.Ready)
def doUpdate(self):
try:
start = time.time()
cur_task = self.current_task #Can change in background so cache it
if self.isSimulated():
sts = [1, 1,"1", 100, "1", "1", 0, 0, \
0, 0, 0, 0, 0, 0, \
0, 0, 0, 0, 0, 0, \
]
else:
sts = self.execute("get_status", cur_task)
self._update_working_mode(int(sts[0]), int(sts[1]))
self.powered = sts[2] == "1"
self.speed = int(sts[3])
self.empty = sts[4] == "1"
self.settled = sts[5] == "1"
#TODO: add tool open
if cur_task is not None:
if int(sts[6]) < 0:
log("Task "+ str(cur_task) + " finished with code: " + str(sts[7]), False)
try:
self.current_task, self.current_task_ret = None, int(sts[7])
except:
self.current_task, self.current_task_ret = None, None
for i in range(6):
self.joint_pos[i] = float(sts[8 + i])
for i in range(6):
self.cartesian_pos[i] = float(sts[14 + i])
ev_index = 20 #7
ev = sts[ev_index] if len(sts)>ev_index else ""
if len(ev.strip()) >0:
self.getLogger().info(ev)
self.on_event(ev)
self._update_state()
self.reset = False
self.setCache({"powered": self.powered,
"speed": self.speed,
"empty": self.empty,
"settled": self.settled,
"task": cur_task,
"mode": self.working_mode,
"status": self.status,
"open": self.tool_open,
"pos": self.get_current_point_cached() if self.state==State.Ready else None #TODO: make it calculated in robot by polling funtion
}, None)
if self.cartesian_motors_enabled:
for m in self.cartesian_motors:
m.readback.update()
if self.joint_motors_enabled:
for m in self.joint_motors:
m.readback.update()
except:
if self.state != State.Offline:
print >> sys.stderr, "Update error: " + str(sys.exc_info()[1])
self.setState(State.Offline)
#Cartesian space
"""
def get_cartesian_pos(self):
self.assert_tool()
return self.eval_pnt("jointToPoint(" + self.tool + ", " + self.frame + ", herej())")
#self.set_jnt(self.herej())
#return self.joint_to_point(tool, frame)
"""
#TODO: in new robot robot.evaluate("tcp_p=jointToPoint(tSuna, world, herej())") is freezing controller (cannot call herej directly)
#We can consider atomic because tcp_j is only accessed in comTcp
def get_cartesian_pos(self, tool=None, frame=None):
if tool is None:
self.assert_tool()
tool = self.tool
if frame is None:
frame = self.frame
#Do not work
#self.evaluate("tcp_j=herej(); tcp_p=jointToPoint(" + tool + ", " + frame + ", tcp_j)")
#return self.get_pnt()
a = self.execute('get_pos', tool, frame)
ret = []
for i in range(6): ret.append(float(a[i]))
return ret
def get_flange_pos(self, frame=None):
return get_cartesian_pos(FLANGE, frame)
def get_cartesian_destination(self, tool=None, frame=None):
if tool is None:
self.assert_tool()
tool = self.tool
if frame is None:
frame = self.frame
return self.here(tool, frame)
def get_distance_to_pnt(self, name):
#self.here(self.tool, self.frame) #???
self.set_pnt(self.get_cartesian_pos() )
return self.distance_p("tcp_p", name)
def is_in_point(self, p, tolerance = None): #Tolerance in mm
if (tolerance is None) and p in self.known_points:
#If checking a known point with default tolerance, updates the position cache
return p in self.get_current_points()
tolerance = self.default_tolerance if tolerance == None else tolerance
d = self.get_distance_to_pnt(p)
if d<0:
raise Exception ("Error calculating distance to " + p + " : " + str(d))
return d<tolerance
def get_distance_to_pnts(self, *pars):
ret = self.execute("dist_pnt", *pars)
ret = ret[0:len(pars)]
for i in range(len(ret)):
try:
ret[i] = float(ret[i])
except:
ret[i] = None
return ret
def is_in_points(self, *pars, **kwargs): #Tolerance in mm
tolerance = self.default_tolerance if (kwargs is None) or (not kwargs.has_key("tolerance")) or (kwargs["tolerance"] is None) else kwargs["tolerance"]
ret = self.get_distance_to_pnts(*pars)
for i in range(len(ret)):
if ret[i]<0:
ret[i] = None
else:
ret[i] = ret[i]<tolerance
if (tolerance == self.default_tolerance) and (set(self.known_points).issubset(set(pars))): #Only update cache if tolerance is default
current_points = []
for i in range(len(ret)):
if ret[i] == True:
current_points.append(self.known_points[i])
self.current_points = current_points
return ret
def assert_in_point(self, p, tolerance = None): #Tolerance in mm
if not self.is_in_point(p, tolerance):
raise Exception ("Not in position " + p)
#Cartesian peudo-motors
def set_motors_enabled(self, value):
if value !=self.cartesian_motors_enabled:
self.cartesian_motors_enabled = value
if value:
for i in range(6):
self.cartesian_motors.append(RobotCartesianMotor(self, i))
add_device(self.cartesian_motors[i], True)
self.cartesian_destination = self.get_cartesian_destination()
else:
for m in self.cartesian_motors:
remove_device(m)
self.cartesian_motors = []
self.cartesian_destination = None
else:
if value:
self.cartesian_destination = self.get_cartesian_destination()
for m in self.cartesian_motors:
m.initialize()
#Cartesian peudo-motors
def set_joint_motors_enabled(self, value):
if value !=self.joint_motors_enabled:
self.joint_motors_enabled = value
if value:
for i in range(6):
self.joint_motors.append(RobotJointMotor(self, i))
add_device(self.joint_motors[i], True)
else:
for m in self.joint_motors:
remove_device(m)
self.joint_motors = []
else:
if value:
for m in self.joint_motors:
m.initialize()
#Alignment
def align(self, point = None, frame = None, desc = None):
if frame is None: frame = self.frame
self.assert_tool()
if desc is None: desc = self.default_desc
if point is None:
self.set_pnt(self.get_cartesian_pos() )
else:
self.set_pnt(point)
np=self.eval_trf('align(tcp_p.trsf, ' + frame + '.trsf)')
self.set_pnt(np)
self.movej("tcp_p", self.tool, desc)
#TODO: The first command is not executed (but receive a move id)
self.movej("tcp_p", self.tool, desc, sync = True)
return np
#High-level, exclusive motion task.
def start_task(self, program, *args, **kwargs):
tasks = [t for t in self.high_level_tasks]
if (self.current_task is not None) and (not self.current_task in tasks):
tasks.append(self.current_task)
if not program in tasks:
tasks.append(program)
#for task in tasks:
# if self.get_task_status(task)[0]>=0:
# raise Exception("Ongoing high-level task: " + task)
ts = self.get_tasks_status(*tasks)
for i in range(len(ts)):
if ts[i] > 0:
raise Exception("Ongoing high-level task: " + tasks[i])
self.clear_task_ret()
for i in range(self.task_start_retries):
self.task_create(program, *args, **kwargs)
(code, status) = self.get_task_status(program)
if code>=0: break
if i < self.task_start_retries-1:
ret = self.get_task_ret(False)
if ret == 0: #Did't start
log("Retrying starting : " + str(program) + str(args) + " - status: " + str(status) + " (" + str(code) + ")", False)
print "Retrying starting : " + str(program) + str(args) + " - status: " + str(status) + " (" + str(code) + ")"
else :
print "Retrying aborted : " + str(program) + str(args) + " - ret: " + str(ret) + " - status: " + str(status) + " (" + str(code) + ")"
break
else:
log("Failed starting : " + str(program) + str(args), False)
print "Failed starting : " + str(program) + str(args)
if self.exception_on_task_start_failure:
raise Exception("Cannot start task: " + program + str(args))
log("Task started: " + str(program) + str(args) + " - status: " + str(status) + " (" + str(code) + ")", False)
self.current_task, self.current_task_ret = program, None
self.update()
#self._update_state()
#TODO: remove
if self.current_task is None:
log("Task finished in first polling : " + str(program) , False)
print "Task finished in first polling : " + str(program)
return code
def stop_task(self):
tasks = [t for t in self.high_level_tasks]
if (self.current_task is not None) and (not self.current_task in tasks):
tasks.append(self.current_task)
for task in tasks:
#if self.get_task_status(task)[0]>=0:
self.task_kill(task)
self.reset_motion()
def get_task_ret(self, cached = True):
return self.current_task_ret if cached else self.eval_int("tcp_ret")
def clear_task_ret(self):
return self.evaluate("tcp_ret=0")
def get_task(self):
return self.current_task
def check_task(self):
if self.current_task is None:
for task in self.high_level_tasks:
if self.get_task_status(task)[0]>=0:
self.current_task, self.current_task_ret = task, None
log("Task detected: " + str(self.current_task), False)
return self.current_task
def wait_task_finished (self, polling = None):
cur_polling = self.polling
if polling is not None:
self.polling = polling
try:
while self.get_task() != None:
time.sleep(self.polling_interval)
finally:
if polling is not None:
self.polling = cur_polling
ret = self.get_task_ret()
return ret
def assert_no_task(self):
task = self.check_task()
if task != None:
raise Exception("Ongoing task: " + task)
def on_event(self,ev):
pass
def on_change_working_mode(self, working_mode):
pass
+129
View File
@@ -0,0 +1,129 @@
class SmartMagnet(DeviceBase):
def __init__(self, name):
#DeviceBase.__init__(self, name, get_context().pluginManager.getDynamicClass("SmartMagnetConfig")())
DeviceBase.__init__(self, name, DeviceConfig({
"holdingCurrent":0.0,
"restingCurrent":0.0,
"mountCurrent":0.0,
"unmountCurrent":0.0,
"reverseCurrent":0.0,
"reverseTime":0.0,
}))
def doInitialize(self):
super(SmartMagnet, self).doInitialize()
self.get_current()
def set_current(self, current):
self.setCache(current, None)
smc_current.write(current)
def get_current(self):
cur = smc_current.read()
self.setCache(cur, None)
return cur
def get_current_rb(self):
self.assert_status()
return smc_current_rb.read()
def get_status(self):
return smc_magnet_status.read()
def assert_status(self):
pass
#if self.get_status() == False:
# raise Exception("Smart Magnet is in faulty status.")
def is_mounted(self):
self.assert_status()
m1 = smc_mounted_1.read()
m2 = smc_mounted_2.read()
if m2==m1:
raise Exception("Smart Magnet has invalid detection.")
return m2
def set_supress(self, value):
smc_sup_det.write(value)
def get_supress(self):
return smc_sup_det.read()
def check_mounted(self, idle_time =1.0, timeout = -1, interval = 0.01):
self.assert_status()
start = time.time()
last = None
while True:
try:
det = self.is_mounted()
except:
det = None
if det != last:
settling_timestamp = time.time()
last = det
else:
if det is not None:
if (time.time()-settling_timestamp > idle_time):
return det
if timeout >= 0:
if (time.time() - start) > timeout:
raise Exception("Timeout waiting for Smart Magnet detection.")
time.sleep(interval)
def doUpdate(self):
try:
if self.get_supress():
self.setState(State.Paused)
elif self.is_mounted():
self.setState(State.Busy)
else:
self.setState(State.Ready)
except:
self.setState(State.Fault)
def set_holding_current(self):
self.set_current(self.config.getFieldValue("holdingCurrent"))
def set_resting_current(self):
self.set_current(self.config.getFieldValue("restingCurrent"))
def set_mount_current(self):
self.set_current(self.config.getFieldValue("mountCurrent"))
def set_unmount_current(self):
self.set_current(self.config.getFieldValue("unmountCurrent"))
def set_reverse_current(self):
self.set_current(self.config.getFieldValue("reverseCurrent"))
def set_default_current(self):
if self.is_mounted():
self.set_holding_current()
else:
self.set_resting_current()
def is_resting_current(self):
is_resting = 2.0 > abs( float(self.get_current()) - float(self.config.getFieldValue("restingCurrent")) )
return is_resting
def apply_reverse(self):
reverse_wait = float(self.config.getFieldValue("reverseTime"))
if reverse_wait >0:
self.set_reverse_current()
time.sleep(reverse_wait)
#Setting resting current to better detect sample
def apply_resting(self):
if not self.is_resting_current():
self.set_resting_current()
add_device(SmartMagnet("smart_magnet"), force = True)
smart_magnet.polling = 1000
smart_magnet.set_default_current()
+175
View File
@@ -0,0 +1,175 @@
LED_LEVEL_ROOM_TEMPERATURE = 0.4
LED_LEVEL_LN2 = 1.0
###################################################################################################
# Leds
###################################################################################################
def set_led_level(level):
level = max(min(float(level),100.0),0.0)
set_setting("led_level", level)
led_ctrl_1.write(led_ctrl_1.config.maxValue * level / 100.0)
led_ctrl_2.write(led_ctrl_2.config.maxValue * level / 100.0)
led_ctrl_3.write(led_ctrl_3.config.maxValue * level / 100.0)
def get_led_level():
level = get_setting("led_level")
return float(0 if level is None else level)
def set_led_state(value):
"""
Turn leds on and off
"""
if value:
set_led_level(100.0)
else:
set_led_level(0.0)
def get_led_state():
"""
Returns led state (on/off)
"""
return led_ctrl_1.read() > 0
#TODO: The range should be set automatically reading LN2 sensor.
def set_led_range(room_temp = True):
"""
Led range should be limitted in room temperature
"""
max_val = LED_LEVEL_ROOM_TEMPERATURE if room_temp else LED_LEVEL_LN2
led_ctrl_1.config.maxValue = max_val
led_ctrl_1.config.save()
led_ctrl_2.config.maxValue = max_val
led_ctrl_2.config.save()
led_ctrl_3.config.maxValue = max_val
led_ctrl_3.config.save()
led_ctrl_1.initialize()
led_ctrl_2.initialize()
led_ctrl_3.initialize()
if led_ctrl_1.read() > max_val:
led_ctrl_1.write(max_val)
if led_ctrl_2.read() > max_val:
led_ctrl_2.write(max_val)
if led_ctrl_3.read() > max_val:
led_ctrl_3.write(max_val)
set_led_level(get_led_level())
def is_led_room_temp():
return led_ctrl_1.config.maxValue <= LED_LEVEL_ROOM_TEMPERATURE
###################################################################################################
# Safety release
###################################################################################################
def release_local():
"""
Release local safety
"""
release_local_safety.write(False)
time.sleep(0.01)
release_local_safety.write(True)
time.sleep(0.01)
release_local_safety.write(False)
def release_psys():
"""
Release psys safety
"""
if is_manual_mode():
raise Exception ("Cannot release PSYS in manual mode")
release_psys_safety.write(False)
time.sleep(0.01)
release_psys_safety.write(True)
time.sleep(0.01)
release_psys_safety.write(False)
###################################################################################################
# Drier
###################################################################################################
MAX_HEATER_TIME = 120000
def set_air_stream(state):
"""
"""
valve_1.write(state)
valve_2.write(not state)
set_heater_chrono = None
def monitor_heater_time():
time.sleep(0.5)
try:
while get_heater():
if set_heater_chrono.isTimeout(MAX_HEATER_TIME):
set_heater(False)
log("Heater timeout expired: turned off", False)
return
time.sleep(0.1)
except:
print sys.exc_info()
def get_heater():
"""
"""
return gripper_dryer.read()
def set_heater(state):
"""
"""
global set_heater_chrono
if get_heater() != state:
gripper_dryer.write(state)
if state:
set_heater_chrono = Chrono()
fork(monitor_heater_time)
def get_air_stream():
"""
"""
return valve_1.read()
def set_pin_cleaner(state):
"""
"""
valve_7.write(state)
valve_8.write(not state)
def get_pin_cleaner():
"""
"""
return valve_7.read()
monitor_pin_cleaner_task = None
def monitor_pin_cleaner(timeout):
global monitor_pin_cleaner_task
this = monitor_pin_cleaner_task
time.sleep(timeout)
if this == monitor_pin_cleaner_task:
set_pin_cleaner(False)
DEFAULT_PIN_CLEANER_TIMEOUT = 5.0
def start_pin_cleaner(timeout=DEFAULT_PIN_CLEANER_TIMEOUT):
global monitor_pin_cleaner_task
set_pin_cleaner(True)
set_pin_cleaner_chrono = Chrono()
monitor_pin_cleaner_task = fork((monitor_pin_cleaner,(timeout,),))[0]