Files
gac-s_changer 55ef0a6f17
2023-05-05 13:40:37 +02:00

296 lines
9.4 KiB
Python

FRAME_TABLE = "fTable"
DESC_FAST = "mFast"
DESC_SLOW = "mSlow"
DESC_SCAN = "mScan"
DESC_DEFAULT = DESC_FAST
AUX_SEGMENT = "X"
RT_SEGMENT = "R"
DEFAULT_ROBOT_POLLING = 500
TASK_WAIT_ROBOT_POLLING = 50
run("devices/RobotTCP")
simulation = False
class RobotSC(RobotTCP):
def __init__(self, name, server, timeout = 1000, retries = 1):
RobotTCP.__init__(self, name, server, timeout, retries)
self.set_tasks([ "mvGonio","putGonio", "getGonio", "recover", "mvPark","mvHome", "mvRT", "mvAux" "getRT", "putRT","getAux","putAux", "mvScanner", "openLid", "closeLid"])
self.set_known_points(["pPark", "pGonioWait", "pGonioG","pHome", "pDewarTempRT", "pAux", "pScan"])
self.setPolling(DEFAULT_ROBOT_POLLING)
self.last_command_timestamp = None
self.last_command_position = None
self.ongoing_task = None
#self.gripper_open_value = False
#self.setSimulated()
def wait_async_motion(self):
og_position, og_assert = self.ongoing_task
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
if callable(og_assert):
og_assert()
self.last_command_position = og_position
self.last_command_timestamp = time.time()
self.ongoing_task = None
def move_rt_async(self):
self.start_task('mvRT')
self.ongoing_task = ("rt", self.assert_rt)
def move_rt(self):
#self.start_task('mvRT')
#self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
#self.assert_dewar()
self.move_rt_async()
self.wait_async_motion()
def move_home_async(self):
self.start_task('mvHome')
self.ongoing_task = ("home", self.assert_home)
def move_home(self):
self.move_home_async()
self.wait_async_motion()
#self.start_task('mvHome')
#self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
#self.assert_home()
def get_rt(self, segment, puck, sample, mounting_in_same_segment=False):
segment = self.toSegmentNumber(segment)
self.start_task('getRT',segment, sample)
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
#self.assert_rt()
self.last_command_position = "rt"
self.last_command_timestamp = time.time()
def put_rt(self, segment, puck, sample, mounting_in_same_segment=False):
segment = self.toSegmentNumber(segment)
self.assert_home()
self.start_task('putRT',segment, sample)
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
#self.assert_rt()
self.last_command_position = "rt"
self.last_command_timestamp = time.time()
def put_gonio(self):
pin_offset = get_pin_offset()
pin_angle_offset = get_pin_angle_offset()
print "Pin offset = " + str(pin_offset)
self.start_task('putGonio', pin_offset)
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_gonio()
if is_valve_controlled():
close_valve()
self.last_command_position = "gonio"
self.last_command_timestamp = time.time()
def get_gonio(self):
pin_offset = get_pin_offset()
self.start_task('getGonio', pin_offset)
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_gonio()
self.last_command_position = "gonio"
self.last_command_timestamp = time.time()
def get_aux(self, sample):
self.assert_aux()
self.start_task('getAux', sample)
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_aux()
self.last_command_position = "aux"
self.last_command_timestamp = time.time()
def put_aux(self, sample):
self.assert_aux()
self.start_task('putAux', sample)
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_aux()
self.last_command_position = "aux"
self.last_command_timestamp = time.time()
def move_scanner(self):
self.start_task('mvScanner')
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_scanner()
self.last_command_position = "scanner"
self.last_command_timestamp = time.time()
def move_laser(self):
self.start_task('mvScanner')
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_laser()
self.last_command_position = "scanner"
self.last_command_timestamp = time.time()
#def do_scan(self):
# self.start_task('doScan')
# self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
# self.assert_scan_stop()
def move_gonio_async(self):
self.start_task('mvGonio')
self.ongoing_task = ("gonio", self.assert_gonio)
def move_gonio(self):
self.move_gonio_async()
self.wait_async_motion()
#self.start_task('mvGonio')
#self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
#self.assert_gonio()
def move_park_async(self):
self.start_task('mvPark')
self.ongoing_task = ("park", self.assert_park)
def move_park(self):
self.move_park_async()
self.wait_async_motion()
#self.start_task('mvPark')
#self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
#self.assert_park()
def robot_recover(self):
self.start_task('recover')
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_home()
self.last_command_position = "home"
self.last_command_timestamp = time.time()
def move_aux_async(self):
self.start_task('mvAux')
self.ongoing_task = ("aux", self.assert_aux)
def move_aux(self):
self.move_aux_async()
self.wait_async_motion()
#self.start_task('mvRT')
#self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
#self.assert_rt()
def open_lid(self, segment):
segment = self.toSegmentNumber(segment)
self.start_task('openLid', segment)
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_home()
self.last_command_position = "open_lid"
self.last_command_timestamp = time.time()
def close_lid(self, segment):
segment = self.toSegmentNumber(segment)
self.start_task('closeLid', segment)
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_home()
self.last_command_position = "close_lid"
self.last_command_timestamp = time.time()
def toSegmentNumber(self, segment):
if is_string(segment):
segment = ord(segment.upper()) - ord('A') +1
return segment
def on_event(self,ev):
#print "EVT: " + ev
pass
def on_change_working_mode(self, working_mode):
if get_device("hexiposi") is not None:
hexiposi.moved = True #Force image processing on first sample
def start_task(self, program, *args, **kwargs):
#TODO: Check safe position
return RobotTCP.start_task(self, program, *args, **kwargs)
def stop_task(self):
RobotTCP.stop_task(self)
#TODO: Restore safe position
def gripper_has_sample(self):
"""checks gripper switches for presence of sample"""
return RT_sample_detection.read()
#hasSample, toolOpen = self.eval_bool("diToolHasSample"), self.eval_bool("diToolOpen")
#return (not toolOpen) and hasSample
def set_remote_mode(self):
robot.set_profile("remote")
def set_local(self):
robot.set_profile("default")
def is_park(self):
return self.is_in_point("pPark")
def is_home(self):
return self.is_in_point("pHome")
def is_gonio(self):
return self.is_in_point("pGonioWait")
def is_scanner(self):
return self.is_in_point("pScan")
def is_aux(self):
return self.is_in_point("pAux")
def is_rt(self):
return self.is_in_point("pDewarTempRT")
#def is_scan_stop(self):
# return self.is_in_point("pScanStop")
def is_cleared(self):
#return self.is_home() or self.is_park() or self.is_dewar() or self.is_dewar_home()
return self.get_current_point() is not None
def assert_park(self):
self.assert_in_point("pPark")
def assert_home(self):
self.assert_in_point("pHome")
def assert_gonio(self):
self.assert_in_point("pGonioWait")
def assert_scanner(self):
self.assert_in_point("pScan")
def assert_aux(self):
self.assert_in_point("pAux")
def assert_rt(self):
self.assert_in_point("pDewarTempRT")
def assert_cleared(self):
if not self.is_cleared():
raise Exception("Robot not in cleared position")
def wait_ready(self):
robot.waitState(State.Ready, 1000) #robot.state.assertReady()
if simulation:
add_device(RobotSC("robot","localhost:1000"),force = True)
else:
add_device(RobotSC("robot", "129.129.244.40:1000"), force = True)
robot.set_default_desc(DESC_DEFAULT)
robot.default_speed = 20
robot.set_frame(FRAME_DEFAULT)
robot.setPolling(DEFAULT_ROBOT_POLLING)
update_robot_tool()
robot.get_current_point() #TODO: REMOVE WHEN CURRENT POINT REPORTED BY POLLING MESSAGE