FRAME_TABLE = "fTable" DESC_FAST = "mFast" DESC_SLOW = "mSlow" DESC_SCAN = "mScan" DESC_DEFAULT = DESC_FAST AUX_SEGMENT = "X" RT_SEGMENT = "R" DEFAULT_ROBOT_POLLING = 500 TASK_WAIT_ROBOT_POLLING = 50 run("devices/RobotTCP") simulation = False class RobotSC(RobotTCP): def __init__(self, name, server, timeout = 1000, retries = 1): RobotTCP.__init__(self, name, server, timeout, retries) self.set_tasks([ "mvGonio","putGonio", "getGonio", "recover", "mvPark","mvHome", "mvRT", "mvAux" "getRT", "putRT","getAux","putAux", "mvScanner", "openLid", "closeLid"]) self.set_known_points(["pPark", "pGonioWait", "pGonioG","pHome", "pDewarTempRT", "pAux", "pScan"]) self.setPolling(DEFAULT_ROBOT_POLLING) self.last_command_timestamp = None self.last_command_position = None self.ongoing_task = None #self.gripper_open_value = False #self.setSimulated() def wait_async_motion(self): og_position, og_assert = self.ongoing_task self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) if callable(og_assert): og_assert() self.last_command_position = og_position self.last_command_timestamp = time.time() self.ongoing_task = None def move_rt_async(self): self.start_task('mvRT') self.ongoing_task = ("rt", self.assert_rt) def move_rt(self): #self.start_task('mvRT') #self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) #self.assert_dewar() self.move_rt_async() self.wait_async_motion() def move_home_async(self): self.start_task('mvHome') self.ongoing_task = ("home", self.assert_home) def move_home(self): self.move_home_async() self.wait_async_motion() #self.start_task('mvHome') #self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) #self.assert_home() def get_rt(self, segment, puck, sample, mounting_in_same_segment=False): segment = self.toSegmentNumber(segment) self.start_task('getRT',segment, sample) self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) #self.assert_rt() self.last_command_position = "rt" self.last_command_timestamp = time.time() def put_rt(self, segment, puck, sample, mounting_in_same_segment=False): segment = self.toSegmentNumber(segment) self.assert_home() self.start_task('putRT',segment, sample) self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) #self.assert_rt() self.last_command_position = "rt" self.last_command_timestamp = time.time() def put_gonio(self): pin_offset = get_pin_offset() pin_angle_offset = get_pin_angle_offset() print "Pin offset = " + str(pin_offset) self.start_task('putGonio', pin_offset) self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) self.assert_gonio() if is_valve_controlled(): close_valve() self.last_command_position = "gonio" self.last_command_timestamp = time.time() def get_gonio(self): pin_offset = get_pin_offset() self.start_task('getGonio', pin_offset) self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) self.assert_gonio() self.last_command_position = "gonio" self.last_command_timestamp = time.time() def get_aux(self, sample): self.assert_aux() self.start_task('getAux', sample) self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) self.assert_aux() self.last_command_position = "aux" self.last_command_timestamp = time.time() def put_aux(self, sample): self.assert_aux() self.start_task('putAux', sample) self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) self.assert_aux() self.last_command_position = "aux" self.last_command_timestamp = time.time() def move_scanner(self): self.start_task('mvScanner') self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) self.assert_scanner() self.last_command_position = "scanner" self.last_command_timestamp = time.time() def move_laser(self): self.start_task('mvScanner') self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) self.assert_laser() self.last_command_position = "scanner" self.last_command_timestamp = time.time() #def do_scan(self): # self.start_task('doScan') # self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) # self.assert_scan_stop() def move_gonio_async(self): self.start_task('mvGonio') self.ongoing_task = ("gonio", self.assert_gonio) def move_gonio(self): self.move_gonio_async() self.wait_async_motion() #self.start_task('mvGonio') #self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) #self.assert_gonio() def move_park_async(self): self.start_task('mvPark') self.ongoing_task = ("park", self.assert_park) def move_park(self): self.move_park_async() self.wait_async_motion() #self.start_task('mvPark') #self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) #self.assert_park() def robot_recover(self): self.start_task('recover') self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) self.assert_home() self.last_command_position = "home" self.last_command_timestamp = time.time() def move_aux_async(self): self.start_task('mvAux') self.ongoing_task = ("aux", self.assert_aux) def move_aux(self): self.move_aux_async() self.wait_async_motion() #self.start_task('mvRT') #self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) #self.assert_rt() def open_lid(self, segment): segment = self.toSegmentNumber(segment) self.start_task('openLid', segment) self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) self.assert_home() self.last_command_position = "open_lid" self.last_command_timestamp = time.time() def close_lid(self, segment): segment = self.toSegmentNumber(segment) self.start_task('closeLid', segment) self.wait_task_finished(TASK_WAIT_ROBOT_POLLING) self.assert_home() self.last_command_position = "close_lid" self.last_command_timestamp = time.time() def toSegmentNumber(self, segment): if is_string(segment): segment = ord(segment.upper()) - ord('A') +1 return segment def on_event(self,ev): #print "EVT: " + ev pass def on_change_working_mode(self, working_mode): if get_device("hexiposi") is not None: hexiposi.moved = True #Force image processing on first sample def start_task(self, program, *args, **kwargs): #TODO: Check safe position return RobotTCP.start_task(self, program, *args, **kwargs) def stop_task(self): RobotTCP.stop_task(self) #TODO: Restore safe position def gripper_has_sample(self): """checks gripper switches for presence of sample""" return RT_sample_detection.read() #hasSample, toolOpen = self.eval_bool("diToolHasSample"), self.eval_bool("diToolOpen") #return (not toolOpen) and hasSample def set_remote_mode(self): robot.set_profile("remote") def set_local(self): robot.set_profile("default") def is_park(self): return self.is_in_point("pPark") def is_home(self): return self.is_in_point("pHome") def is_gonio(self): return self.is_in_point("pGonioWait") def is_scanner(self): return self.is_in_point("pScan") def is_aux(self): return self.is_in_point("pAux") def is_rt(self): return self.is_in_point("pDewarTempRT") #def is_scan_stop(self): # return self.is_in_point("pScanStop") def is_cleared(self): #return self.is_home() or self.is_park() or self.is_dewar() or self.is_dewar_home() return self.get_current_point() is not None def assert_park(self): self.assert_in_point("pPark") def assert_home(self): self.assert_in_point("pHome") def assert_gonio(self): self.assert_in_point("pGonioWait") def assert_scanner(self): self.assert_in_point("pScan") def assert_aux(self): self.assert_in_point("pAux") def assert_rt(self): self.assert_in_point("pDewarTempRT") def assert_cleared(self): if not self.is_cleared(): raise Exception("Robot not in cleared position") def wait_ready(self): robot.waitState(State.Ready, 1000) #robot.state.assertReady() if simulation: add_device(RobotSC("robot","localhost:1000"),force = True) else: add_device(RobotSC("robot", "129.129.244.40:1000"), force = True) robot.set_default_desc(DESC_DEFAULT) robot.default_speed = 20 robot.set_frame(FRAME_DEFAULT) robot.setPolling(DEFAULT_ROBOT_POLLING) update_robot_tool() robot.get_current_point() #TODO: REMOVE WHEN CURRENT POINT REPORTED BY POLLING MESSAGE