754 lines
32 KiB
Python
754 lines
32 KiB
Python
"""
|
|
--> eco has to take care of using the correctkeywords (i.e. r instead of t_det)
|
|
--> recording can be done with robot.record() similar to robot.simulate() taking the usual kwargs, issuing either a linear, joint or cartesian motion with record = True flag.
|
|
the flag then saves coordinates, tool, frame temporarily, starts motion command in thread, adds callback function to send an event, do an interpolation and save the interpolated values to save coordinates,
|
|
--> any motion command will automatically rset the motion
|
|
--> resent_motion should cancel the recording task / kill it
|
|
rename save_coordinates to checked / tested or so.
|
|
"""
|
|
|
|
|
|
import math
|
|
PVBASE = "SARES20-ROB:"
|
|
KNOWN_POINTS = P_PARK, P_HOME = "park", "home"
|
|
TASKS = MOVE_PARK, MOVE_HOME, TWEAK_X , TWEAK_Y = "movePark", "moveHome", "tweakX", "tweakY"
|
|
|
|
DESCS = DESC_FAST,DESC_SLOW, = "mNomSpeed", "mNomSpeed"
|
|
DESC_DEFAULT = DESCS[0]
|
|
|
|
FLANGE = "flange"
|
|
|
|
TOOLS = ["t_JF01T03", "t_JF07T32", ]
|
|
#TOOLS = ["t_Detector",]
|
|
TOOL_DEFAULT = TOOL_DET = TOOLS[0]
|
|
|
|
FRAMES = ["f_actualFrame", "f_direct", "f_2mRad", "f_4mRad", "f_6mRad", "f_8mRad"]
|
|
#FRAMES = ["f_actualFrame", "f_Experiment1", "f_Experiment2", "f_Experiment3"]
|
|
FRAME_DEFAULT = FRAMES[1]
|
|
|
|
DEFAULT_ROBOT_POLLING = 200
|
|
TASK_WAIT_ROBOT_POLLING = 50
|
|
DEFAULT_SPEED=100
|
|
|
|
ROBOT_JOINT_MOTORS = ["j1" , "j2", "j3", "j4", "j5", "j6"]
|
|
ROBOT_CARTESIAN_MOTORS = ["x" , "y", "z", "rx", "ry", "rz"]
|
|
ROBOT_SPHERICAL_MOTORS = ["r" , "gamma", "delta"]
|
|
|
|
run("devices/RobotTCP")
|
|
|
|
override_remote_safety = False
|
|
simulation = False
|
|
|
|
def dimensionality_decorator(func):
|
|
def decorated(self, *args):
|
|
if hasattr(self[0], "__len__"):
|
|
return Array([func(s_, *args) for s_ in self])
|
|
else:
|
|
return Array(func(self, *args))
|
|
return decorated
|
|
|
|
def indexing_decorator(func):
|
|
def decorated(self, v, *args):
|
|
nn = len(shape(v))
|
|
if (nn ==1):
|
|
if (type(v[0]) is bool):
|
|
return Array([s_ for s_, v_ in zip(self,v) if v_])
|
|
else:
|
|
return Array([self[v_] for v_ in v])
|
|
elif nn ==2:
|
|
if type(v[0]) is bool:
|
|
return Array([[s_ for s_, v_ in zip(self[n],v[n]) if v_ ] for n in range(nn)])
|
|
else:
|
|
ret = func(self, v, *args)
|
|
if hasattr(ret, "__len__"):
|
|
return Array(ret)
|
|
else:
|
|
return ret
|
|
return decorated
|
|
|
|
class Array(list):
|
|
def __init__(self, data):
|
|
super(Array, self).__init__(data)
|
|
self.getint = super(Array, self).__getitem__
|
|
@dimensionality_decorator
|
|
def __add__(self, v):
|
|
return add(self, v)
|
|
@dimensionality_decorator
|
|
def __sub__(self, v):
|
|
return sub(self, v)
|
|
@dimensionality_decorator
|
|
def __mul__(self, v):
|
|
return mul(self, v)
|
|
@dimensionality_decorator
|
|
def __div__(self, v):
|
|
return div(self, v)
|
|
@dimensionality_decorator
|
|
def interpolate(self, num):
|
|
return interpolate(self, num)
|
|
@dimensionality_decorator
|
|
def __lt__(self, v):
|
|
return lower_than(self, v)
|
|
@dimensionality_decorator
|
|
def __le__(self, v):
|
|
return lower_equal(self, v)
|
|
@dimensionality_decorator
|
|
def __gt__(self, v):
|
|
return greater_than(self, v)
|
|
@dimensionality_decorator
|
|
def __ge__(self, v):
|
|
return greater_equal(self, v)
|
|
@indexing_decorator
|
|
def __getitem__(self, v):
|
|
return super(Array, self).__getitem__(v)
|
|
@property
|
|
def shape(self):
|
|
return shape(self)
|
|
@property
|
|
def abs(self):
|
|
return arr_abs(self)
|
|
@property
|
|
def T(self):
|
|
return Array(transpose(self))
|
|
### math helper functions
|
|
def shape(self):
|
|
o = self
|
|
s = []
|
|
while hasattr(o, "__len__"):
|
|
s = s+[len(o)]
|
|
o = o[0]
|
|
return s
|
|
|
|
def transpose(a):
|
|
mm = len(a[0])
|
|
a_t = [[v[m] for v in a] for m in range(mm)]
|
|
return a_t
|
|
|
|
def arr_abs(a):
|
|
n = a.shape
|
|
if len(n)==1:
|
|
a_abs = [abs(e) for e in a]
|
|
elif len(n)==2:
|
|
a_abs = [[abs(e) for e in v] for v in a]
|
|
return Array(a_abs)
|
|
|
|
def interpolate(v,points_per_interval, round_to=2):
|
|
v_i=[[round(v[m]+1.*n/points_per_interval*(v[m+1]-v[m]), round_to) for n in range(points_per_interval)] for m in range(len(v)-1)]
|
|
ret = []
|
|
for i in v_i:
|
|
ret=ret+i
|
|
ret=ret+[v[-1]]
|
|
return ret
|
|
|
|
def add(list1, v):
|
|
if hasattr(v, "__len__"):
|
|
return [l+v_ for l, v_ in zip(list1, v)]
|
|
else:
|
|
return [l+v for l in list1]
|
|
|
|
def sub(list1, v):
|
|
if hasattr(v, "__len__"):
|
|
return [l-v_ for l, v_ in zip(list1, v)]
|
|
else:
|
|
return [l-v for l in list1]
|
|
|
|
def mul(list1, v):
|
|
if hasattr(v, "__len__"):
|
|
return [l*v_ for l, v_ in zip(list1, v)]
|
|
else:
|
|
return [l*v for l in list1]
|
|
|
|
def div(list1, v):
|
|
if hasattr(v, "__len__"):
|
|
return [1.*l/v_ for l, v_ in zip(list1, v)]
|
|
else:
|
|
return [1.*l/v for l in list1]
|
|
|
|
def lower_than(self, v):
|
|
if hasattr(v, "__len__"):
|
|
return [v_>s_ for v_, s_ in zip(v, self)]
|
|
else:
|
|
return [v>s_ for s_ in self]
|
|
|
|
def lower_equal(self, v):
|
|
if hasattr(v, "__len__"):
|
|
return [v_>=s_ for v_, s_ in zip(v, self)]
|
|
else:
|
|
return [v>=s_ for s_ in self]
|
|
|
|
def greater_than(self, v):
|
|
if hasattr(v, "__len__"):
|
|
return [s_>v_ for v_, s_ in zip(v, self)]
|
|
else:
|
|
return [s_>v for s_ in self]
|
|
|
|
def greater_equal(self, v):
|
|
if hasattr(v, "__len__"):
|
|
return [s_>=v_ for v_, s_ in zip(v, self)]
|
|
else:
|
|
return [s_>=v for s_ in self]
|
|
|
|
class RobotBernina(RobotTCP):
|
|
def __init__(self, name, server, timeout = 1000, retries = 1):
|
|
RobotTCP.__init__(self, name, server, timeout, retries)
|
|
self.invert_detector_rz = False
|
|
self.set_tasks(TASKS)
|
|
self.set_known_points(KNOWN_POINTS)
|
|
self.setPolling(DEFAULT_ROBOT_POLLING)
|
|
self.last_command_timestamp = None
|
|
self.last_command_position = None
|
|
self.override_remote_safety = override_remote_safety
|
|
temp = {"frame": Array([]), "tool": Array([]), "pos": Array([])}
|
|
self.remote_allowed_recorded = {"spherical": temp, "cartesian": temp, "joint": temp}
|
|
self.remote_allowed_deltas = {
|
|
"spherical": {'delta': 1., 'r': 50., 'gamma': 1.},
|
|
"cartesian": {'rx': 1., 'ry': 1., 'rz': 1., 'x': 1., 'y': 1., 'z': 1.},
|
|
"joint": {'j1': 1., 'j2': 1., 'j3': 1., 'j4': 1., 'j5': 1., 'j6': 1.},
|
|
}
|
|
|
|
def set_override_remote_safety(self, value):
|
|
self.override_remote_safety = bool(value)
|
|
|
|
def set_invert_detector_rz(self, value):
|
|
self.invert_detector_rz = bool(value)
|
|
|
|
def remote_allowed(self, from_coordinates, to_coordinates, frame_trsf, tool_trsf, motion):
|
|
if self.override_remote_safety:
|
|
return True
|
|
ks = ROBOT_CARTESIAN_MOTORS if motion == "cartesian" else ROBOT_SPHERICAL_MOTORS if motion == "spherical" else ROBOT_JOINT_MOTORS
|
|
# spherical motors are missing rx ry rz, maybe even in simulation
|
|
a1 = [from_coordinates[k] for k in ks]
|
|
a2 = [to_coordinates[k] for k in ks]
|
|
ds = [self.remote_allowed_deltas[motion][k] for k in ks]
|
|
cds = [self.remote_allowed_deltas[motion][k] for k in ks]
|
|
rc = self.remote_allowed_recorded[motion]
|
|
if len(rc["pos"]) > 0:
|
|
#preselection by frame with concatenated arrays
|
|
if "frame" in rc.keys():
|
|
fr = rc["frame"]
|
|
idxs = (fr-frame_trsf).abs < cds
|
|
idxs_b =[all(idx) for idx in idxs]
|
|
rc = {"pos": rc["pos"][idxs_b], "tool": rc["tool"][idxs_b]}
|
|
if len(rc["pos"]) > 0:
|
|
if "tool" in rc.keys():
|
|
tr = rc["tool"]
|
|
idxs = (tr-tool_trsf).abs < cds
|
|
idxs_b =[all(idx) for idx in idxs]
|
|
rc = {"pos": rc["pos"][idxs_b]}
|
|
if len(rc["pos"]) > 0:
|
|
for tr in rc["pos"]:
|
|
idxs1 = (tr-a1).abs < ds
|
|
idxs2 = (tr-a2).abs < ds
|
|
idx1 = [all(idx) for idx in idxs1]
|
|
idx2 = [all(idx) for idx in idxs2]
|
|
if (any(idx1)) & (any(idx2)):
|
|
return True
|
|
return False
|
|
|
|
def general_motion(self, **kwargs):
|
|
## check motion type
|
|
self.reset_motion()
|
|
if (sum([m in kwargs.keys() for m in ROBOT_JOINT_MOTORS])>0) & (sum([m in kwargs.keys() for m in ROBOT_SPHERICAL_MOTORS])==0) & (sum([m in kwargs.keys() for m in ROBOT_CARTESIAN_MOTORS])==0):
|
|
move = self.move_joint
|
|
elif (sum([m in kwargs.keys() for m in ROBOT_CARTESIAN_MOTORS])>0) & (sum([m in kwargs.keys() for m in ROBOT_SPHERICAL_MOTORS])==0) & (sum([m in kwargs.keys() for m in ROBOT_JOINT_MOTORS])==0):
|
|
move = self.move_cartesian
|
|
elif (sum([m in kwargs.keys() for m in ROBOT_SPHERICAL_MOTORS])>0) & (sum([m in kwargs.keys() for m in ROBOT_CARTESIAN_MOTORS])==0) & (sum([m in kwargs.keys() for m in ROBOT_JOINT_MOTORS])==0):
|
|
move = self.move_spherical
|
|
else:
|
|
get_context().sendEvent("motion", "\nNo unique coordinate system found for these motions use only unique keywords out of:\n {}\n {}\n {}.".format(ROBOT_JOINT_MOTORS, ROBOT_CARTESIAN_MOTORS, ROBOT_SPHERICAL_MOTORS))
|
|
return
|
|
self.last_remote_motion = "recording"
|
|
move(sync = True, **kwargs)
|
|
return True
|
|
|
|
def record_motion(self, **kwargs):
|
|
## check motion type
|
|
self.reset_motion(msg = "RECORDING TRAJECTORY TO LIST OF ALLOWED REMOTE MOTIONS - USE HANDHELD TO FINISH MOTION")
|
|
if self.working_mode != "manual":
|
|
get_context().sendEvent("motion", "Current working mode is {}. Change it to manual using the keyswitch to record motions.".format(self.working_mode))
|
|
return True
|
|
if (sum([m in kwargs.keys() for m in ROBOT_JOINT_MOTORS])>0) & (sum([m in kwargs.keys() for m in ROBOT_SPHERICAL_MOTORS])==0) & (sum([m in kwargs.keys() for m in ROBOT_CARTESIAN_MOTORS])==0):
|
|
k = "joint"
|
|
move = self.move_joint
|
|
elif (sum([m in kwargs.keys() for m in ROBOT_CARTESIAN_MOTORS])>0) & (sum([m in kwargs.keys() for m in ROBOT_SPHERICAL_MOTORS])==0) & (sum([m in kwargs.keys() for m in ROBOT_JOINT_MOTORS])==0):
|
|
k = "cartesian"
|
|
move = self.move_cartesian
|
|
elif (sum([m in kwargs.keys() for m in ROBOT_SPHERICAL_MOTORS])>0) & (sum([m in kwargs.keys() for m in ROBOT_CARTESIAN_MOTORS])==0) & (sum([m in kwargs.keys() for m in ROBOT_JOINT_MOTORS])==0):
|
|
k = "spherical"
|
|
move = self.move_spherical
|
|
else:
|
|
get_context().sendEvent("motion", "\nNo unique coordinate system found for these motions use only unique keywords out of:\n {}\n {}\n {}.".format(ROBOT_JOINT_MOTORS, ROBOT_CARTESIAN_MOTORS, ROBOT_SPHERICAL_MOTORS))
|
|
return
|
|
if k == "spherical":
|
|
interp = Array([robot.cart2sph(return_dict=False, **{k:e for k,e in zip(ROBOT_CARTESIAN_MOTORS,v)}) for v in move(simulate=True, coordinates = "cartesian", **kwargs)]).T.interpolate(10).T
|
|
else:
|
|
interp = Array(move(simulate=True, coordinates = k, **kwargs)).T.interpolate(10).T
|
|
self.last_remote_motion = "recording"
|
|
move(sync = True, **kwargs)
|
|
frame = Array(self.frame_trsf)
|
|
tool = Array(self.tool_trsf)
|
|
self.remote_allowed_recorded[k]["frame"].append(frame)
|
|
self.remote_allowed_recorded[k]["tool"].append(tool)
|
|
self.remote_allowed_recorded[k]["pos"].append(interp)
|
|
self.on_recording_finished()
|
|
return True
|
|
|
|
def on_recording_finished(self):
|
|
get_context().sendEvent("motion", "Finished, recording")
|
|
|
|
def set_motion_queue_empty(self, state=False):
|
|
### update the status of the robot, the queue of motion commands
|
|
### the status is polled automatically from the controller but this setting command does not have a delay
|
|
self.empty = state
|
|
self._update_state()
|
|
|
|
def check_software_limits(self, j1=0, j2=0, j3=0, j4=0, j5=0, j6=0):
|
|
j = [j1,j2,j3,j4,j5,j6]
|
|
self.set_jnt(j, name="tcp_j")
|
|
self.evaluate("tcp_b=isInRange(tcp_j)")
|
|
return self.get_bool(name="tcp_b")
|
|
|
|
def deg2rad(self, angles):
|
|
return [a/180.*math.pi for a in angles]
|
|
|
|
def rad2deg(self, angles):
|
|
return [a*180./math.pi for a in angles]
|
|
|
|
def get_spherical_pos(self, frame = None, return_dict=True):
|
|
cur_cart = self.get_cartesian_pos(frame = frame, return_dict=True)
|
|
cur_sph = self.cart2sph(return_dict=return_dict, **cur_cart)
|
|
return cur_sph
|
|
|
|
def update_spherical_pos(self):
|
|
self.spherical_pos=self.cart2sph(return_dict=True, **self.cartesian_pos)
|
|
|
|
def get_movel_interpolation(self, i, coordinates="joint", idx_pnt1=0, idx_pnt2=1):
|
|
"""
|
|
coordinates: "joint" or "cartesian"
|
|
returns the interpolated position in joint or cartesian coordinates at fraction i
|
|
of the linear motion from point tcp_p_spherical[idx_pnt1] to tcp_p_spherical[idx_pnt2].
|
|
|
|
Note: i=0 returns the start point, i=1 the end point of the motion
|
|
"""
|
|
a = self.execute("get_movel_interpolation", i, coordinates, idx_pnt1, idx_pnt2)
|
|
ret = []
|
|
for i in range(6): ret.append(float(a[i]))
|
|
return ret
|
|
|
|
def get_movec_interpolation(self, i, coordinates="joint"):
|
|
"""
|
|
coordinates: "joint" or "cartesian"
|
|
returns the interpolated position in joint or cartesian coordinates at fraction i
|
|
of the circular motion along the circle given by the start point tcp_p_spherical[1],
|
|
the intermediate point tcp_p_spherical[2] and the target tcp_p_spherical[3].
|
|
|
|
Note: i=0 returns the start point, i=1 the end point of the motion
|
|
"""
|
|
a = self.execute("get_movec_interpolation", i, coordinates)
|
|
ret = []
|
|
for i in range(6): ret.append(float(a[i]))
|
|
return ret
|
|
|
|
def sph2cart(self, r=None, gamma=None, delta=None, return_dict=True, **kwargs):
|
|
gamma, delta = self.deg2rad([gamma, delta])
|
|
x = r*math.cos(delta)*math.sin(gamma)
|
|
y = r*math.sin(delta)
|
|
z = r*math.cos(delta)*math.cos(gamma)
|
|
rz = -math.asin(-math.sin(-math.sin(gamma)*math.sin(-delta)*math.sqrt(1/(math.cos(-delta)**2+(math.sin(gamma)*math.sin(-delta))**2))))
|
|
rx, ry, rz = self.rad2deg([-delta,+gamma, rz])
|
|
if self.invert_detector_rz:
|
|
rz = rz+180
|
|
if abs(rz/180) > 0:
|
|
rz = rz - (rz+180)/360*360
|
|
if return_dict:
|
|
ret = {"x": x, "y": y, "z": z, "rx": rx, "ry": ry, "rz":rz}
|
|
else:
|
|
ret = [x, y, z, rx, ry, rz]
|
|
return ret
|
|
|
|
def cart2sph(self, x=None, y=None, z=None, return_dict=True, **kwargs):
|
|
r = math.sqrt(x**2 + y**2 + z**2)
|
|
if z**2+x**2>0:
|
|
gamma = math.copysign(1, x)*math.acos(z/math.sqrt(z**2+x**2))
|
|
else:
|
|
ry = kwargs["ry"] if "ry" in kwargs.keys() else self.cartesian_pos["ry"]
|
|
gamma, = self.deg2rad([ry])
|
|
delta = math.asin(y/r)
|
|
gamma, delta = self.rad2deg([gamma, delta])
|
|
if return_dict:
|
|
ret = {"r": r, "gamma": gamma, "delta": delta}
|
|
else:
|
|
ret = [r,gamma, delta]
|
|
return ret
|
|
|
|
def check_calculations(self):
|
|
sph = self.get_spherical_pos()
|
|
cart_calc = self.sph2cart(**sph)
|
|
cart = self.get_cartesian_pos(return_dict=True)
|
|
print("spherical", sph)
|
|
print("cart_calc", cart_calc)
|
|
print("cart", cart)
|
|
|
|
err = {k: cart[k]-cart_calc[k] for k in cart_calc.keys()}
|
|
return err
|
|
|
|
def point_reachable(self, point,tool=None, verbose=True):
|
|
if tool == None:
|
|
tool = self.tool
|
|
self.herej()
|
|
reachable = self.eval_bool("pointToJoint(" + tool + ",tcp_j," + point + ",j)")
|
|
if verbose:
|
|
if not reachable:
|
|
get_context().sendEvent("motion", "Point is not in range")
|
|
print("Point is not in range")
|
|
return reachable
|
|
|
|
def simulate_stored_commands(self):
|
|
if self.last_remote_motion == "None":
|
|
ret = None
|
|
elif self.last_remote_motion == "cartesian":
|
|
target = self.cartesian_motors["x"].target_pos
|
|
ret = self.move_cartesian(simulate = True, **target)
|
|
elif self.last_remote_motion == "spherical":
|
|
target = self.spherical_motors["r"].target_pos
|
|
ret = self.move_spherical(simulate = True, **target)
|
|
elif self.last_remote_motion == "joint":
|
|
target = self.joint_motors["j1"].target_pos
|
|
ret = self.move_joint(simulate = True, **target)
|
|
return ret
|
|
|
|
def move_cartesian(self, x=None, y=None, z=None, rx=None, ry=None, rz=None, tool=None, frame=None, desc=None, sync=False, simulate=False, coordinates="joint"):
|
|
"""
|
|
Motion in the cartesian coordinate system using a linear motion movel command to
|
|
move from point tcp_p_spherical[0] to tcp_p_spherical[1].
|
|
|
|
frame: string
|
|
The frame defines the cartesian coordinate system and origin. Must exist on the controller.
|
|
|
|
sync: True or False
|
|
If true, no further commands are accepted until the motion is finished.
|
|
|
|
simulate: True or False
|
|
coordinates: "joint" or "cartesian"
|
|
If simulate = True, an array of interpolated positions in either joint or cartesian
|
|
coordinates is returned. Setting coordinates only has an effect, when the motion is simulated.
|
|
"""
|
|
|
|
sim = []
|
|
if frame == None:
|
|
frame = self.frame
|
|
if tool == None:
|
|
tool = self.tool
|
|
if desc == None:
|
|
desc = self.default_desc
|
|
self.set_frame(frame, name="tcp_f_spherical", change_default=False)
|
|
target_cart = {"x":x, "y":y, "z":z, "rx":rx, "ry":ry, "rz":rz}
|
|
current_cart = self.get_cartesian_pos(frame = frame, return_dict=True)
|
|
for k, v in target_cart.items():
|
|
if v == None:
|
|
target_cart[k] = current_cart[k]
|
|
if (self.working_mode == "remote") & (~simulate):
|
|
if not self.remote_allowed(current_cart, target_cart, frame_trsf=self.get_trsf(frame+".trsf"), tool_trsf=self.get_trsf(tool+".trsf"), motion="cartesian"):
|
|
get_context().sendEvent("motion", "Requested remote motion is not in recorded trajectories. Use rob.record_motion(*args) to record the motion in manual working mode first.")
|
|
return False
|
|
self.cartesian_destination = target_cart
|
|
self.set_pnt([current_cart[k] for k in ["x", "y", "z", "rx", "ry", "rz"]], name="tcp_p_spherical[0]")
|
|
self.set_pnt([target_cart[k] for k in ["x", "y", "z", "rx", "ry", "rz"]], name="tcp_p_spherical[1]")
|
|
if self.point_reachable("tcp_p_spherical[1]", verbose=True):
|
|
if simulate:
|
|
for i in range(11):
|
|
sim.append(self.get_movel_interpolation(i/10., coordinates=coordinates))
|
|
else:
|
|
self.set_motion_queue_empty(False)
|
|
ret = self.movel("tcp_p_spherical[1]", tool=tool, desc=None, sync=sync)
|
|
else:
|
|
return False
|
|
if simulate:
|
|
return sim
|
|
else:
|
|
return ret
|
|
|
|
def move_spherical(self, r=None, gamma=None, delta=None, tool=None, frame=None, desc=None, sync=False, simulate=False, coordinates="joint"):
|
|
"""
|
|
Motion in the spherical coordinate system using a linear moteion movel command to
|
|
change the radius from point tcp_p_spherical[0] to tcp_p_spherical[1], followed
|
|
by a circular motion along the circle given by the start point tcp_p_spherical[1],
|
|
the intermediate point tcp_p_spherical[2] and the target tcp_p_spherical[3].
|
|
|
|
frame: string
|
|
The frame defines the cartesian coordinate system and origin. Must exist on the controller.
|
|
|
|
sync: True or False
|
|
If true, no further commands are accepted until the motion is finished.
|
|
|
|
simulate: True or False
|
|
coordinates: "joint" or "cartesian"
|
|
If simulate = True, an array of interpolated positions in either joint or cartesian
|
|
coordinates is returned. Setting coordinates only has an effect, when the motion is simulated.
|
|
"""
|
|
sim = []
|
|
if frame == None:
|
|
frame = self.frame
|
|
if tool == None:
|
|
tool = self.tool
|
|
if desc == None:
|
|
desc = self.default_desc
|
|
self.set_frame(frame, name="tcp_f_spherical", change_default=False)
|
|
current_cart = self.get_cartesian_pos(frame = frame, return_dict=True)
|
|
current_sph = self.get_spherical_pos(frame = frame, return_dict=True)
|
|
target_sph = {"r": r, "gamma": gamma, "delta": delta}
|
|
self.spherical_destination = target_sph
|
|
for k, v in target_sph.items():
|
|
if v == None:
|
|
target_sph[k] = current_sph[k]
|
|
if (self.working_mode == "remote") & (~simulate):
|
|
if not self.remote_allowed(current_sph, target_sph, frame_trsf=self.get_trsf(frame+".trsf"), tool_trsf=self.get_trsf(tool+".trsf"), motion="spherical"):
|
|
get_context().sendEvent("motion", "Requested remote motion is not in recorded trajectories. Use rob.record_motion(*args) to record the motion in manual working mode first.")
|
|
return False
|
|
# First only move in the radial direction
|
|
target_sph_linear = current_sph.copy()
|
|
target_sph_linear["r"] = target_sph["r"]
|
|
target_cart_linear = current_cart.copy()
|
|
tmp = self.sph2cart(return_dict=True, **target_sph_linear)
|
|
target_cart_linear.update({k:tmp[k] for k in ["x", "y", "z"]})
|
|
self.set_pnt([current_cart[k] for k in ["x", "y", "z", "rx", "ry", "rz"]], name="tcp_p_spherical[0]")
|
|
self.set_pnt([target_cart_linear[k] for k in ["x", "y", "z", "rx", "ry", "rz"]], name="tcp_p_spherical[1]")
|
|
if (self.point_reachable("tcp_p_spherical[1]"))&(self.distance_p("tcp_p_spherical[0]", "tcp_p_spherical[1]")>.1):
|
|
if simulate:
|
|
for i in range(11):
|
|
sim.append(self.get_movel_interpolation(i/10., coordinates=coordinates))
|
|
else:
|
|
self.set_motion_queue_empty(False)
|
|
ret = self.movel("tcp_p_spherical[1]", tool=tool, desc=None, sync=sync)
|
|
print("moving radius")
|
|
|
|
# Next move angles
|
|
target_cart = current_cart.copy()
|
|
target_cart.update(self.sph2cart(return_dict=True, **target_sph))
|
|
self.cartesian_destination = [target_cart[k] for k in ["x", "y", "z", "rx", "ry", "rz"]]
|
|
self.set_pnt([target_cart[k] for k in ["x", "y", "z", "rx", "ry", "rz"]], name="tcp_p_spherical[3]")
|
|
|
|
intermediate_cart = current_cart.copy()
|
|
intermediate_sph = target_sph.copy()
|
|
intermediate_sph.update({
|
|
"gamma": (target_sph["gamma"]+current_sph["gamma"])/2,
|
|
"delta": (target_sph["delta"]+current_sph["delta"])/2
|
|
})
|
|
intermediate_cart.update(self.sph2cart(return_dict=True, **intermediate_sph))
|
|
self.set_pnt([intermediate_cart[k] for k in ["x", "y", "z", "rx", "ry", "rz"]], name="tcp_p_spherical[2]")
|
|
distance = self.distance_p("tcp_p_spherical[1]", "tcp_p_spherical[3]")
|
|
|
|
## check if current point is on spherical trajectory (rx, ry, rz)
|
|
calculated_cartesian_starting_pos = self.sph2cart(**self.cart2sph(**target_cart_linear))
|
|
is_spherical_starting_point = [abs(target_cart_linear[k] - calculated_cartesian_starting_pos[k]) < 1. for k in ["x", "y", "z"]]
|
|
is_spherical_starting_point = is_spherical_starting_point + [abs(abs(abs(target_cart_linear[k] - calculated_cartesian_starting_pos[k])-180)-180) < 1. for k in ["rx", "ry", "rz"]]
|
|
print(target_cart_linear)
|
|
print(calculated_cartesian_starting_pos)
|
|
print(is_spherical_starting_point)
|
|
is_spherical_starting_point=all(is_spherical_starting_point)
|
|
|
|
if (self.point_reachable("tcp_p_spherical[3]"))&(distance>.1):
|
|
if (self.point_reachable("tcp_p_spherical[2]"))& (distance>1) & is_spherical_starting_point:
|
|
if simulate:
|
|
if len(sim)==0:
|
|
for i in range(11):
|
|
sim.append(self.get_movec_interpolation(i/10., coordinates=coordinates))
|
|
else:
|
|
for i in range(10):
|
|
sim.append(self.get_movec_interpolation(i/10.+0.1, coordinates=coordinates))
|
|
else:
|
|
ret = self.movec("tcp_p_spherical[2]", "tcp_p_spherical[3]", tool=None, desc=None, sync=sync)
|
|
print("moving angle circle")
|
|
else:
|
|
print("moving angle linear")
|
|
if simulate:
|
|
if len(sim)==0:
|
|
for i in range(11):
|
|
sim.append(self.get_movel_interpolation(i/10., coordinates=coordinates, idx_pnt1=1, idx_pnt2=3))
|
|
else:
|
|
for i in range(10):
|
|
sim.append(self.get_movel_interpolation(i/10.+0.1, coordinates=coordinates, idx_pnt1=1, idx_pnt2=3))
|
|
else:
|
|
self.set_motion_queue_empty(False)
|
|
ret = self.movel("tcp_p_spherical[3]", tool=None, desc=None, sync=sync)
|
|
if simulate:
|
|
return sim
|
|
|
|
def move_joint(self, j1=None, j2=None, j3=None, j4=None, j5=None, j6=None, tool=None, desc=None, sync=False, simulate=False, coordinates="joint"):
|
|
# Simulation not implemented yet
|
|
if desc == None:
|
|
desc = self.default_desc
|
|
if tool == None:
|
|
tool = self.tool
|
|
target_jnt = {"j1": j1, "j2": j2, "j3": j3, "j4": j4, "j5": j5, "j6": j6}
|
|
current_jnt = self.get_joint_pos(return_dict=True)
|
|
for k, v in target_jnt.items():
|
|
if v == None:
|
|
target_jnt[k] = current_jnt[k]
|
|
if (self.working_mode == "remote") & (~simulate):
|
|
if not self.remote_allowed(current_jnt, target_jnt, frame_trsf=None, tool_trsf=None, motion="joint"):
|
|
get_context().sendEvent("motion", "Requested remote motion is not in recorded trajectories. Use rob.record_motion(*args) to record the motion in manual working mode first.")
|
|
return False
|
|
if simulate:
|
|
sim = []
|
|
cur_joint = [current_jnt[k] for k in ["j1", "j2", "j3", "j4", "j5", "j6"]]
|
|
delta_joint = [target_jnt[k]-current_jnt[k] for k in ["j1", "j2", "j3", "j4", "j5", "j6"]]
|
|
for i in range(11):
|
|
sim.append([j0+dj*i/10 for j0, dj in zip(cur_joint, delta_joint)])
|
|
else:
|
|
self.set_jnt([target_jnt[k] for k in ["j1", "j2", "j3", "j4", "j5", "j6"]], name="tcp_j")
|
|
self.set_motion_queue_empty(False)
|
|
self.movej("tcp_j", tool=tool , desc=desc, sync=sync)
|
|
if simulate:
|
|
return sim
|
|
|
|
def move_home(self):
|
|
if not self.is_in_point(P_HOME):
|
|
self.start_task(MOVE_HOME)
|
|
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
|
|
self.assert_home()
|
|
|
|
def move_park(self):
|
|
if not self.is_in_point(P_PARK):
|
|
self.start_task(MOVE_PARK)
|
|
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
|
|
self.assert_park()
|
|
|
|
def tweak_x(self, offset):
|
|
self.start_task(TWEAK_X, offset)
|
|
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
|
|
|
|
def set_powered(self, value):
|
|
if value:
|
|
self.enable()
|
|
if not value:
|
|
self.disable()
|
|
|
|
def tweak_y(self, offset):
|
|
self.start_task(TWEAK_Y, offset)
|
|
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
|
|
|
|
def on_event(self,ev):
|
|
#print "EVT: " + ev
|
|
pass
|
|
|
|
def on_task_finished(self, task, ret):
|
|
if self.isSimulated():
|
|
if ret is not None and ret <0:
|
|
if task==MOVE_HOME: self.simulated_point=P_HOME
|
|
elif task==MOVE_PARK: self.simulated_point=P_PARK
|
|
else: self.simulated_point=None
|
|
|
|
def doUpdate(self):
|
|
#start = time.time()
|
|
RobotTCP.doUpdate(self)
|
|
|
|
def start_task(self, program, *args, **kwargs):
|
|
#TODO: Check safe position
|
|
self.task_name = program
|
|
self.task_args = args
|
|
self.task_kwargs = kwargs
|
|
ret = RobotTCP.start_task(self, program, *args, **kwargs)
|
|
|
|
|
|
def stop_task(self):
|
|
RobotTCP.stop_task(self)
|
|
#TODO: Restore safe position
|
|
|
|
|
|
def set_remote_mode(self):
|
|
self.set_profile("remote")
|
|
|
|
def set_local(self):
|
|
self.set_profile("default")
|
|
|
|
def is_park(self):
|
|
return self.is_in_point(P_PARK)
|
|
|
|
def is_home(self):
|
|
return self.is_in_point(P_HOME)
|
|
|
|
def is_cleared(self):
|
|
return self.get_current_point() is not None
|
|
|
|
def assert_park(self):
|
|
self.assert_in_point(P_PARK)
|
|
|
|
def assert_home(self):
|
|
self.assert_in_point(P_HOME)
|
|
|
|
def assert_cleared(self):
|
|
if not self.is_cleared():
|
|
raise Exception("Robot not in cleared position")
|
|
|
|
def wait_ready(self):
|
|
self.waitState(State.Ready, 1000) #robot.state.assertReady()
|
|
|
|
def motor_to_epics_pvs(self, motors=[]):
|
|
self.epics_pvs = {str("SARES20-ROB:" + motor.name + ".VAL").upper(): CAS(str("SARES20-ROB:" + motor.name + ".VAL").upper(), motor, "double") for motor in motors}
|
|
self.epics_pvs.update({str("SARES20-ROB:" + motor.name + ".RBV").upper(): CAS(str("SARES20-ROB:" + motor.name + ".RBV").upper(), motor.readback, "double") for motor in motors})
|
|
|
|
if simulation:
|
|
add_device(RobotBernina("robot","localhost:1234"),force = True)
|
|
else:
|
|
add_device(RobotBernina("robot", "129.129.243.106:1234"), force = True)
|
|
|
|
time.sleep(0.1)
|
|
|
|
|
|
class DummySTP(RegisterBase):
|
|
def __init__(self, val=None):
|
|
self.val = val
|
|
self.initialize()
|
|
def doRead(self):
|
|
return self.val
|
|
def doWrite(self, val):
|
|
self.val=val
|
|
|
|
class MotorConfigPV(ReadonlyRegisterBase):
|
|
def __init__(self, Motor, fieldname):
|
|
self.motor = Motor
|
|
self.initialize()
|
|
def doRead(self):
|
|
return self.motor.config.getFieldValue(fieldname)
|
|
|
|
def make_archivable(Motor):
|
|
base = PVBASE + Motor.name
|
|
pv_def = {
|
|
"": [Motor.readback, "double"],
|
|
".VAL": [Motor, "double"],
|
|
".RBV": [Motor.readback, "double"],
|
|
".EGU": [MotorConfigPV(Motor, "unit"), "string"],
|
|
".ADEL": [DummySTP(0.002), "double"],
|
|
".SCAN": [DummySTP("passive"), "string"],
|
|
".PINI": [DummySTP("No"), "string"],
|
|
".PHAS": [DummySTP(0), "short"],
|
|
".EVNT": [DummySTP(""), "string"],
|
|
".PRIO": [DummySTP(0), "short"],#should be ENUM with 0,1,3 being LOW, MEDIUM, HIGH
|
|
".DISV": [DummySTP(1), "short"],
|
|
".DISA": [DummySTP(0), "short"],
|
|
".SDIS": [DummySTP(""), "string"],
|
|
".PROC": [DummySTP(0), "short"],#should be uchar
|
|
".DISS": [DummySTP(0), "short"],#should be ENUM with 0,1,2,3 being NO?ALARM, MINOR, MAJOR, INVALID
|
|
".LCNT": [DummySTP(0), "short"],#should be uchar
|
|
".PACT": [DummySTP(0), "short"],#should be uchar
|
|
}
|
|
epics_pvs = {base + field: CAS((base+field).upper(), device, dtype) for field, [device, dtype] in pv_def.items()}
|
|
epics_pvs.update({base + ".FLNK": CAS((base+".FLNK").upper(), DummySTP((base+".RBV").upper()), "string")})
|
|
Motor.epics_pvs = epics_pvs
|
|
|
|
#robot.latency = 0.005
|
|
robot.set_default_desc(DESC_DEFAULT)
|
|
robot.default_speed = DEFAULT_SPEED
|
|
robot.set_frame(FRAME_DEFAULT)
|
|
robot.set_frame(FRAME_DEFAULT, name="tcp_f_spherical", change_default=False)
|
|
robot.set_tool(TOOL_DEFAULT)
|
|
robot.setPolling(DEFAULT_ROBOT_POLLING)
|
|
robot.get_current_point() #TODO: REMOVE WHEN CURRENT POINT REPORTED BY POLLING MESSAGE
|
|
robot.set_motors_enabled(True)
|
|
robot.set_joint_motors_enabled(True)
|
|
motors = list(robot.cartesian_motors.values())+list(robot.spherical_motors.values())+list(robot.joint_motors.values())+list(robot.linear_axis_motors.values())
|
|
for m in motors:
|
|
make_archivable(m)
|
|
#robot.motor_to_epics_pvs()
|
|
#show_panel(robot)
|