Multiple remote motion commands are now merged if issued in the same coordinate system

This commit is contained in:
gac-bernina
2024-01-12 17:56:22 +01:00
parent bf75855806
commit cb4b71fff3
3 changed files with 124 additions and 30 deletions

View File

@@ -24,7 +24,7 @@ ROBOT_SPHERICAL_MOTORS = ["r" , "gamma", "delta"]
run("devices/RobotTCP")
simulation = True
simulation = False
class RobotBernina(RobotTCP):
@@ -280,9 +280,9 @@ class RobotBernina(RobotTCP):
for k, v in target_jnt.items():
if v == None:
target_jnt[k] = current_jnt[k]
self.robot.set_jnt(target_jnt, name="tcp_j")
self.set_jnt([target_jnt[k] for k in ["j1", "j2", "j3", "j4", "j5", "j6"]], name="tcp_j")
self.set_motion_queue_empty(False)
self.robot.movej("tcp_j", tool , desc)
self.movej("tcp_j", tool , desc)
def move_home(self):
if not self.is_in_point(P_HOME):
@@ -390,5 +390,5 @@ robot.setPolling(DEFAULT_ROBOT_POLLING)
robot.get_current_point() #TODO: REMOVE WHEN CURRENT POINT REPORTED BY POLLING MESSAGE
robot.set_motors_enabled(True)
robot.set_joint_motors_enabled(True)
robot.motor_to_epics_pvs(robot.cartesian_motors+robot.spherical_motors+robot.joint_motors+robot.linear_axis_motors)
robot.motor_to_epics_pvs(list(robot.cartesian_motors.values())+list(robot.spherical_motors.values())+list(robot.joint_motors.values())+list(robot.linear_axis_motors.values()))
#show_panel(robot)

View File

@@ -1,6 +1,23 @@
import time
import ch.psi.pshell.device.PositionerConfig as PositionerConfig
def on_motion_events(Motor):
def on_combined_motion(self, changed_motors):
s = "Combining " + str(len(changed_motors)) + " motions to new target:\n"
for k, v in changed_motors.items():
s += "{:8} {:6} --> {}\n".format(k,v[0], v[1])
get_context().sendEvent("motion", s)
def on_coordinate_system_change(self, old, new):
s = "User changed coordinate system " + old + " --> " +new + ":\n"
get_context().sendEvent("motion", s)
Motor.on_combined_motion = on_combined_motion
Motor.on_coordinate_system_change = on_coordinate_system_change
return Motor
class LinearAxisMotor(PositionerBase):
def __init__(self, robot, name = "z_lin"):
PositionerBase.__init__(self, name, PositionerConfig())
@@ -34,15 +51,20 @@ class LinearAxisMotor(PositionerBase):
time.sleep(0.1)
robot.evaluate("setEcDo(ecdo_linMotStop,false)")
@on_motion_events
class RobotCartesianMotor(PositionerBase):
def __init__(self, robot, name):
PositionerBase.__init__(self, name, PositionerConfig())
self.robot = robot
@property
def target_coordinates(self):
def target_pos(self):
return {k: m.setpoint for k,m in robot.cartesian_motors.items()}
@target_pos.setter
def target_pos(self, value):
for k,m in robot.cartesian_motors.items():
m.setpoint = value[k] if k in value.keys() else robot.cartesian_pos[k]
#ATTENTION: Always initialize cartesian motors before scanning (or call robot.set_motors_enabled(True))
def doInitialize(self):
self.setpoint = self.doReadReadback()
@@ -55,7 +77,22 @@ class RobotCartesianMotor(PositionerBase):
if self.robot.cartesian_destination is not None:
self.setpoint = value
self.setCache(float(value), None)
robot.move_cartesian(**{self.name:value})
changed_motors = {k: [self.robot.cartesian_pos[k], self.target_pos[k]] for k in self.target_pos.keys() if round(self.robot.cartesian_pos[k], 2) != round(self.target_pos[k], 2)}
if (len(changed_motors) > 1) & (self.robot.last_remote_motion == "cartesian"):
self.robot.evaluate("resetMotion()")
target = self.target_pos
print("moving to combined", target)
self.on_combined_motion(changed_motors)
else:
if self.robot.last_remote_motion != "cartesian":
if not self.robot.empty:
self.robot.evaluate("resetMotion()")
self.on_coordinate_system_change(self.robot.last_remote_motion, "cartesian")
target = {self.name:value}
self.target_pos = target
print("moving to", target)
robot.move_cartesian(**target)
self.robot.last_remote_motion = "cartesian"
def doReadReadback(self):
return float("nan") if len(self.robot.cartesian_pos)==0 else float(self.robot.cartesian_pos[self.name])
@@ -64,19 +101,24 @@ class RobotCartesianMotor(PositionerBase):
robot.stop()
robot.reset_motion()
robot.resume()
def move_done(self):
"""once move done, do readback update and send event and readback"""
@on_motion_events
class RobotSphericalMotor (PositionerBase):
def __init__(self, robot, name):
PositionerBase.__init__(self, name, PositionerConfig())
self.robot = robot
@property
def target_coordinates(self):
def target_pos(self):
return {k: m.setpoint for k,m in robot.spherical_motors.items()}
@target_pos.setter
def target_pos(self, value):
for k,m in robot.spherical_motors.items():
m.setpoint = value[k] if k in value.keys() else robot.spherical_pos[k]
def doInitialize(self):
self.setpoint = self.doReadReadback()
self.setCache(self.doRead(), None)
@@ -88,8 +130,23 @@ class RobotSphericalMotor (PositionerBase):
if self.robot.spherical_destination is not None:
self.setpoint = value
self.setCache(float(value), None)
robot.move_spherical(**{self.name:value})
changed_motors = {k: [self.robot.spherical_pos[k], self.target_pos[k]] for k in self.target_pos.keys() if round(self.robot.spherical_pos[k], 2) != round(self.target_pos[k], 2)}
if (len(changed_motors) > 1) & (self.robot.last_remote_motion == "spherical"):
self.robot.evaluate("resetMotion()")
target = self.target_pos
print("moving to combined", target)
self.on_combined_motion(changed_motors)
else:
if self.robot.last_remote_motion != "spherical":
if not self.robot.empty:
self.robot.evaluate("resetMotion()")
self.on_coordinate_system_change(self.robot.last_remote_motion, "spherical")
target = {self.name:value}
self.target_pos = target
print("moving to", target)
robot.move_spherical(**target)
self.robot.last_remote_motion = "spherical"
def doReadReadback(self):
return float("nan") if len(self.robot.spherical_pos)==0 else robot.spherical_pos[self.name]
@@ -98,16 +155,21 @@ class RobotSphericalMotor (PositionerBase):
robot.reset_motion()
robot.resume()
@on_motion_events
class RobotJointMotor (PositionerBase):
def __init__(self, robot, name):
PositionerBase.__init__(self, name, PositionerConfig())
self.robot = robot
@property
def target_coordinates(self):
def target_pos(self):
return {k: m.setpoint for k,m in robot.joint_motors.items()}
@target_pos.setter
def target_pos(self, value):
print(value)
for k,m in robot.joint_motors.items():
m.setpoint = value[k] if k in value.keys() else robot.joint_pos[k]
def doInitialize(self):
self.setpoint = self.doReadReadback()
self.setCache(self.doRead(), None)
@@ -117,9 +179,23 @@ class RobotJointMotor (PositionerBase):
def doWrite(self, value):
self.setpoint = value
self.robot.move_joint(**{self.name:value})
changed_motors = {k: [self.robot.joint_pos[k], self.target_pos[k]] for k in self.target_pos.keys() if round(self.robot.joint_pos[k], 2) != round(self.target_pos[k], 2)}
if (len(changed_motors) > 1) & (self.robot.last_remote_motion == "joint"):
self.robot.evaluate("resetMotion()")
target = self.target_pos
print("moving to combined", target)
self.on_combined_motion(changed_motors)
else:
if self.robot.last_remote_motion != "joint":
if not self.robot.empty:
self.robot.evaluate("resetMotion()")
self.on_coordinate_system_change(self.robot.last_remote_motion, "joint")
target = {self.name:value}
self.target_pos = target
print("moving to", target)
self.robot.move_joint(**target)
self.robot.last_remote_motion = "joint"
def doReadReadback(self):
return self.robot.get_joint_pos()[self.name] if len(self.robot.joint_pos)==0 else float(self.robot.joint_pos[self.name])

View File

@@ -30,6 +30,7 @@ class RobotTCP(TcpDevice, Stoppable):
self.high_level_tasks = []
self.known_points = []
self.current_points = []
self.last_remote_motion = "None"
self.cartesian_destination = None
self.spherical_destination = None
self.linear_axis_destination = None
@@ -393,20 +394,24 @@ class RobotTCP(TcpDevice, Stoppable):
},
},
}
cur_mode = self.working_mode
prev_status = self.status
prev_mode = self.working_mode
if mode in modestr.keys():
self.working_mode = modestr[mode]["mode"]
self.status = modestr[mode]["status"][status]
else:
self.working_mode = "invalid"
self.status = "invalid"
if self.working_mode != cur_mode:
if self.working_mode != prev_mode:
try:
self.on_change_working_mode(self.working_mode)
self.on_change_working_mode(self.working_mode, prev_mode)
except:
pass
if self.status != prev_status:
try:
self.on_change_status(self.status, prev_status)
except:
pass
def read_working_mode(self):
try:
mode = self.eval_int("workingMode(tcp_a)")
@@ -431,13 +436,14 @@ class RobotTCP(TcpDevice, Stoppable):
return fault
#Motion control
def stop(self):
def stop(self, msg=""):
self.evaluate("stopMove()")
#get_context().sendEvent("stop", "Stop cmd received: stopping robot motions! \n" + msg)
def resume(self):
self.evaluate("restartMove()")
def reset_motion(self, joint=None, timeout=None):
def reset_motion(self, joint=None, timeout=None, msg=""):
self.evaluate("resetMotion()" if (joint is None) else ("resetMotion(" + joint + ")"))
### reinitialize all motors (target values)
if self.cartesian_motors_enabled:
@@ -452,6 +458,7 @@ class RobotTCP(TcpDevice, Stoppable):
if self.joint_motors_enabled:
for m in self.joint_motors.values():
m.doInitialize()
#get_context().sendEvent("reset_motion", "Reset motion cmd received: Cancelling all stored movement commands on controller! \n" + msg)
def is_empty(self):
self.empty = self.eval_bool("isEmpty()")
@@ -837,8 +844,8 @@ class RobotTCP(TcpDevice, Stoppable):
if self.state != State.Offline:
print >> sys.stderr, "Update error: " + str(sys.exc_info()[1])
self.setState(State.Offline)
get_context().sendEvent("polling", self.on_poll_status)
self.on_polling()
def doUpdate_backup(self):
try:
@@ -1272,12 +1279,23 @@ class RobotTCP(TcpDevice, Stoppable):
task = self.check_task()
if task != None:
raise Exception("Ongoing task: " + task)
def on_polling(self):
get_context().sendEvent("polling", self.on_poll_status)
def on_event(self,ev):
pass
def on_change_working_mode(self, working_mode):
def on_change_working_mode(self, working_mode, prev_working_mode):
pass
def on_change_status(self, status, prev_status):
if status in ["joint", "cartesian_frame", "cartesian_tool", "point"]:
msg = "Manual mode change to " + status + " mode detected"
print(msg)
self.reset_motion(msg=msg)
get_context().sendEvent("reset_motion", "Cancelling all stored movement commands on controller! \n" + msg)
pass
def on_task_finished(self, task, ret):
pass