Files
bernina_robot/script/devices/RobotBernina.py

655 lines
30 KiB
Python

import math
PVBASE = "SARES20-ROB:"
KNOWN_POINTS = P_PARK, P_HOME = "park", "home"
TASKS = MOVE_PARK, MOVE_HOME, TWEAK_X , TWEAK_Y = "movePark", "moveHome", "tweakX", "tweakY"
DESCS = DESC_FAST,DESC_SLOW, = "mNomSpeed", "mNomSpeed"
DESC_DEFAULT = DESCS[0]
FLANGE = "flange"
DEFAULT_ROBOT_POLLING = 200
TASK_WAIT_ROBOT_POLLING = 50
DEFAULT_SPEED=100
ROBOT_JOINT_MOTORS = ["j1" , "j2", "j3", "j4", "j5", "j6"]
ROBOT_CARTESIAN_MOTORS = ["x" , "y", "z", "rx", "ry", "rz"]
ROBOT_SPHERICAL_MOTORS = ["r" , "gamma", "delta"]
FRAME_HIRARCHY = ["world.f_SwissFELCoord.f_ESBWorld.f_linearAxis." + k for k in ["f_direct.", "f_2mRad.", "f_4mRad.", "f_6mRad.", "f_8mRad.",]]
FRAME_HIRARCHY += ["t_flange.t_Adapter." + k for k in ["t_JF01T03det.t_JF01T03.", "t_JF07T32det.t_JF07T32."]]
run("devices/RobotTCP")
override_remote_safety = False
simulation = False
class RobotBernina(RobotTCP):
def __init__(self, name, server, timeout = 1000, retries = 1):
RobotTCP.__init__(self, name, server, timeout, retries)
self.set_tasks(TASKS)
self.set_known_points(KNOWN_POINTS)
self.setPolling(DEFAULT_ROBOT_POLLING)
self.last_command_timestamp = None
self.last_command_position = None
self.override_remote_safety = override_remote_safety
temp = {"frame": Array([]), "tool": Array([]), "pos": Array([])}
default_remote_allowed_recorded = {"spherical": temp, "cartesian": temp, "joint": temp}
default_remote_allowed_deltas = {
"spherical": {'delta': 1., 'r': 50., 'gamma': 1.},
"cartesian": {'rx': 1., 'ry': 1., 'rz': 1., 'x': 1., 'y': 1., 'z': 1.},
"joint": {'j1': 1., 'j2': 1., 'j3': 1., 'j4': 1., 'j5': 1., 'j6': 1.},
}
self.remote_allowed_recorded = AdjustableFS(name="remote_allowed", default_value = default_remote_allowed_recorded)
self.remote_allowed_deltas = AdjustableFS(name="remote_allowed_deltas", default_value = default_remote_allowed_deltas)
def get_frame_tree(self, frame):
"""frame can be a string of a tool or a frame"""
for f in FRAME_HIRARCHY:
if frame in f:
return f.split("." + frame + ".")[0]+"." + frame
return frame
def set_override_remote_safety(self, value):
self.override_remote_safety = bool(value)
def reset_recorded_motions(self, index=None, motion=None):
if any([index is None, motion is None]):
temp = {"frame": Array([]), "tool": Array([]), "pos": Array([])}
default_remote_allowed_recorded = {"spherical": temp, "cartesian": temp, "joint": temp}
self.remote_allowed_recorded(default_remote_allowed_recorded)
return "removed all recorded motions"
else:
rem = self.remote_allowed_recorded()
for k in ["pos", "tool", "frame"]:
rem[motion][k].pop[index]
self.remote_allowed_recorded(rem)
return "removed recorded " + motion + " motion with index " + str(index)
def remote_allowed(self, from_coordinates, to_coordinates, frame_trsf, tool_trsf, motion):
if self.override_remote_safety:
return True
ks = ROBOT_CARTESIAN_MOTORS if motion == "cartesian" else ROBOT_SPHERICAL_MOTORS if motion == "spherical" else ROBOT_JOINT_MOTORS
# spherical motors are missing rx ry rz, maybe even in simulation
a1 = [from_coordinates[k] for k in ks]
a2 = [to_coordinates[k] for k in ks]
ds = Array([self.remote_allowed_deltas()[motion][k] for k in ks])
cds = Array([self.remote_allowed_deltas()[motion][k] for k in ks])
rc = self.remote_allowed_recorded()[motion]
if len(rc["pos"]) > 0:
#preselection by frame with concatenated arrays
if "frame" in rc.keys():
fr = Array(rc["frame"])
idxs = (fr-frame_trsf).abs < cds
idxs_b =[all(idx) for idx in idxs]
rc = {"pos": Array(rc["pos"])[idxs_b], "tool": Array(rc["tool"])[idxs_b]}
if len(rc["pos"]) > 0:
if "tool" in rc.keys():
tr = rc["tool"]
idxs = (tr-tool_trsf).abs < cds
idxs_b =[all(idx) for idx in idxs]
rc = {"pos": Array(rc["pos"])[idxs_b]}
if len(rc["pos"]) > 0:
for tr in rc["pos"]:
idxs1 = (Array(tr)-a1).abs < ds
idxs2 = (Array(tr)-a2).abs < ds
idx1 = [all(idx) for idx in idxs1]
idx2 = [all(idx) for idx in idxs2]
if (any(idx1)) & (any(idx2)):
return True
print("finish")
return False
def general_motion(self, **kwargs):
## check motion type
self.reset_motion()
if (sum([m in kwargs.keys() for m in ROBOT_JOINT_MOTORS])>0) & (sum([m in kwargs.keys() for m in ROBOT_SPHERICAL_MOTORS])==0) & (sum([m in kwargs.keys() for m in ROBOT_CARTESIAN_MOTORS])==0):
move = self.move_joint
elif (sum([m in kwargs.keys() for m in ROBOT_CARTESIAN_MOTORS])>0) & (sum([m in kwargs.keys() for m in ROBOT_SPHERICAL_MOTORS])==0) & (sum([m in kwargs.keys() for m in ROBOT_JOINT_MOTORS])==0):
move = self.move_cartesian
elif (sum([m in kwargs.keys() for m in ROBOT_SPHERICAL_MOTORS])>0) & (sum([m in kwargs.keys() for m in ROBOT_CARTESIAN_MOTORS])==0) & (sum([m in kwargs.keys() for m in ROBOT_JOINT_MOTORS])==0):
move = self.move_spherical
else:
get_context().sendEvent("motion", "\nNo unique coordinate system found for these motions use only unique keywords out of:\n {}\n {}\n {}.".format(ROBOT_JOINT_MOTORS, ROBOT_CARTESIAN_MOTORS, ROBOT_SPHERICAL_MOTORS))
return
self.last_remote_motion = "recording"
move(sync = True, **kwargs)
return True
def record_motion(self, **kwargs):
## check motion type
self.reset_motion(msg = "RECORDING TRAJECTORY TO LIST OF ALLOWED REMOTE MOTIONS - USE HANDHELD TO FINISH MOTION")
if self.working_mode != "manual":
get_context().sendEvent("motion", "Current working mode is {}. Change it to manual using the keyswitch to record motions.".format(self.working_mode))
if (sum([m in kwargs.keys() for m in ROBOT_JOINT_MOTORS])>0) & (sum([m in kwargs.keys() for m in ROBOT_SPHERICAL_MOTORS])==0) & (sum([m in kwargs.keys() for m in ROBOT_CARTESIAN_MOTORS])==0):
k = "joint"
move = self.move_joint
elif (sum([m in kwargs.keys() for m in ROBOT_CARTESIAN_MOTORS])>0) & (sum([m in kwargs.keys() for m in ROBOT_SPHERICAL_MOTORS])==0) & (sum([m in kwargs.keys() for m in ROBOT_JOINT_MOTORS])==0):
k = "cartesian"
move = self.move_cartesian
elif (sum([m in kwargs.keys() for m in ROBOT_SPHERICAL_MOTORS])>0) & (sum([m in kwargs.keys() for m in ROBOT_CARTESIAN_MOTORS])==0) & (sum([m in kwargs.keys() for m in ROBOT_JOINT_MOTORS])==0):
k = "spherical"
move = self.move_spherical
else:
get_context().sendEvent("motion", "\nNo unique coordinate system found for these motions use only unique keywords out of:\n {}\n {}\n {}.".format(ROBOT_JOINT_MOTORS, ROBOT_CARTESIAN_MOTORS, ROBOT_SPHERICAL_MOTORS))
return
if k == "spherical":
interp = Array([robot.cart2sph(return_dict=False, **{k:e for k,e in zip(ROBOT_CARTESIAN_MOTORS,v)}) for v in move(simulate=True, coordinates = "cartesian", **kwargs)]).T.interpolate(10).T
else:
interp = Array(move(simulate=True, coordinates = k, **kwargs)).T.interpolate(10).T
self.last_remote_motion = "recording"
move(sync = True, **kwargs)
frame = Array(self.frame_trsf)
tool = Array(self.tool_trsf)
recorded = self.remote_allowed_recorded()
recorded[k]["frame"].append(frame)
recorded[k]["tool"].append(tool)
recorded[k]["pos"].append(interp)
self.remote_allowed_recorded(recorded)
self.on_recording_finished()
return True
def on_recording_finished(self):
get_context().sendEvent("motion", "Finished, recording")
def set_motion_queue_empty(self, state=False):
### update the status of the robot, the queue of motion commands
### the status is polled automatically from the controller but this setting command does not have a delay
self.empty = state
self._update_state()
def check_software_limits(self, j1=0, j2=0, j3=0, j4=0, j5=0, j6=0):
j = [j1,j2,j3,j4,j5,j6]
self.set_jnt(j, name="tcp_j")
self.evaluate("tcp_b=isInRange(tcp_j)")
return self.get_bool(name="tcp_b")
def deg2rad(self, angles):
if hasattr(angles, "__len__"):
return [a/180.*math.pi for a in angles]
else:
return angles/180.*math.pi
def rad2deg(self, angles):
if hasattr(angles, "__len__"):
return [a*180./math.pi for a in angles]
else:
return angles*180./math.pi
def get_spherical_pos(self, frame = None, return_dict=True):
cur_cart = self.get_cartesian_pos(frame = frame, return_dict=True)
cur_sph = self.cart2sph(return_dict=return_dict, **cur_cart)
return cur_sph
def update_spherical_pos(self):
self.spherical_pos=self.cart2sph(return_dict=True, **self.cartesian_pos)
def get_movel_interpolation(self, i, coordinates="joint", idx_pnt1=0, idx_pnt2=1):
"""
coordinates: "joint" or "cartesian"
returns the interpolated position in joint or cartesian coordinates at fraction i
of the linear motion from point tcp_p_spherical[idx_pnt1] to tcp_p_spherical[idx_pnt2].
Note: i=0 returns the start point, i=1 the end point of the motion
"""
a = self.execute("get_movel_interpolation", i, coordinates, idx_pnt1, idx_pnt2)
ret = []
for i in range(6): ret.append(float(a[i]))
return ret
def get_movec_interpolation(self, i, coordinates="joint"):
"""
coordinates: "joint" or "cartesian"
returns the interpolated position in joint or cartesian coordinates at fraction i
of the circular motion along the circle given by the start point tcp_p_spherical[1],
the intermediate point tcp_p_spherical[2] and the target tcp_p_spherical[3].
Note: i=0 returns the start point, i=1 the end point of the motion
"""
a = self.execute("get_movec_interpolation", i, coordinates)
ret = []
for i in range(6): ret.append(float(a[i]))
return ret
def sph2cart(self, r=None, gamma=None, delta=None, return_dict=True, **kwargs):
gamma, delta = self.deg2rad([gamma, delta])
x = r*math.cos(delta)*math.sin(gamma)
y = r*math.sin(delta)
z = r*math.cos(delta)*math.cos(gamma)
# New try first delta
ry = -math.asin(-math.cos(delta)*math.sin(gamma))
rx = -math.acos(math.cos(delta)*math.cos(gamma)/math.cos(ry))
rz = math.asin(-math.sin(ry)*math.sin(rx)*math.sqrt(1/(math.cos(rx)**2+(math.sin(ry)*math.sin(rx))**2)))
rx, ry, rz = self.rad2deg([rx, ry, rz])
#end new try
print rz
#Necessary to keep detector orientation for gamma larger 90
if self.rad2deg(gamma) > 90:
rz = 180-rz
if self.rad2deg(delta) > 90:
rz = -rz
if abs(rz//180) > 0:
rz = rz - (rz+180)//360*360
if return_dict:
ret = {"x": x, "y": y, "z": z, "rx": rx, "ry": ry, "rz":rz}
else:
ret = [x, y, z, rx, ry, rz]
return ret
def cart2sph(self, x=None, y=None, z=None, return_dict=True, **kwargs):
r = math.sqrt(x**2 + y**2 + z**2)
if z**2+x**2>0:
gamma = math.copysign(1, x)*math.acos(z/math.sqrt(z**2+x**2))
else:
ry = kwargs["ry"] if "ry" in kwargs.keys() else self.cartesian_pos["ry"]
gamma, = self.deg2rad([ry])
delta = math.asin(y/r)
gamma, delta = self.rad2deg([gamma, delta])
if return_dict:
ret = {"r": r, "gamma": gamma, "delta": delta}
else:
ret = [r,gamma, delta]
return ret
def check_calculations(self):
sph = self.get_spherical_pos()
cart_calc = self.sph2cart(**sph)
cart = self.get_cartesian_pos(return_dict=True)
print("spherical", sph)
print("cart_calc", cart_calc)
print("cart", cart)
err = {k: cart[k]-cart_calc[k] for k in cart_calc.keys()}
return err
def point_reachable(self, point,tool=None, verbose=True):
if tool == None:
tool = self.tool()
self.herej()
reachable = self.eval_bool("pointToJoint(" + tool + ",tcp_j," + point + ",j)")
if verbose:
if not reachable:
get_context().sendEvent("motion", "Point is not in range")
print("Point is not in range")
return reachable
def simulate_stored_commands(self):
if self.last_remote_motion == "None":
ret = None
elif self.last_remote_motion == "cartesian":
target = self.cartesian_motors["x"].target_pos
ret = self.move_cartesian(simulate = True, **target)
elif self.last_remote_motion == "spherical":
target = self.spherical_motors["r"].target_pos
ret = self.move_spherical(simulate = True, **target)
elif self.last_remote_motion == "joint":
target = self.joint_motors["j1"].target_pos
ret = self.move_joint(simulate = True, **target)
return ret
def move_cartesian(self, x=None, y=None, z=None, rx=None, ry=None, rz=None, tool=None, frame=None, desc=None, sync=False, simulate=False, coordinates="joint"):
"""
Motion in the cartesian coordinate system using a linear motion movel command to
move from point tcp_p_spherical[0] to tcp_p_spherical[1].
frame: string
The frame defines the cartesian coordinate system and origin. Must exist on the controller.
sync: True or False
If true, no further commands are accepted until the motion is finished.
simulate: True or False
coordinates: "joint" or "cartesian"
If simulate = True, an array of interpolated positions in either joint or cartesian
coordinates is returned. Setting coordinates only has an effect, when the motion is simulated.
"""
sim = []
if frame == None:
frame = self.frame()
if tool == None:
tool = self.tool()
if desc == None:
desc = self.default_desc
self.set_frame(frame, name="tcp_f_spherical", change_default=False)
target_cart = {"x":x, "y":y, "z":z, "rx":rx, "ry":ry, "rz":rz}
current_cart = self.get_cartesian_pos(frame = frame, return_dict=True)
for k, v in target_cart.items():
if v == None:
target_cart[k] = current_cart[k]
if (self.working_mode == "remote") & (~simulate):
if not self.remote_allowed(current_cart, target_cart, frame_trsf=self.get_trsf(frame+".trsf"), tool_trsf=self.get_trsf(tool+".trsf"), motion="cartesian"):
get_context().sendEvent("motion", "Requested remote motion is not in recorded trajectories. Use rob.record_motion(*args) to record the motion in manual working mode first.")
return False
self.cartesian_destination = target_cart
self.set_pnt([current_cart[k] for k in ["x", "y", "z", "rx", "ry", "rz"]], name="tcp_p_spherical[0]")
self.set_pnt([target_cart[k] for k in ["x", "y", "z", "rx", "ry", "rz"]], name="tcp_p_spherical[1]")
if self.point_reachable("tcp_p_spherical[1]", verbose=True):
if simulate:
for i in range(11):
sim.append(self.get_movel_interpolation(i/10., coordinates=coordinates))
else:
self.set_motion_queue_empty(False)
ret = self.movel("tcp_p_spherical[1]", tool=tool, desc=None, sync=sync)
else:
return False
if simulate:
return sim
else:
return ret
def move_spherical(self, r=None, gamma=None, delta=None, tool=None, frame=None, desc=None, sync=False, simulate=False, coordinates="joint"):
"""
Motion in the spherical coordinate system using a linear moteion movel command to
change the radius from point tcp_p_spherical[0] to tcp_p_spherical[1], followed
by a circular motion along the circle given by the start point tcp_p_spherical[1],
the intermediate point tcp_p_spherical[2] and the target tcp_p_spherical[3].
frame: string
The frame defines the cartesian coordinate system and origin. Must exist on the controller.
sync: True or False
If true, no further commands are accepted until the motion is finished.
simulate: True or False
coordinates: "joint" or "cartesian"
If simulate = True, an array of interpolated positions in either joint or cartesian
coordinates is returned. Setting coordinates only has an effect, when the motion is simulated.
"""
sim = []
if frame == None:
frame = self.frame()
if tool == None:
tool = self.tool()
if desc == None:
desc = self.default_desc
self.set_frame(frame, name="tcp_f_spherical", change_default=False)
current_cart = self.get_cartesian_pos(frame = frame, return_dict=True)
current_sph = self.get_spherical_pos(frame = frame, return_dict=True)
target_sph = {"r": r, "gamma": gamma, "delta": delta}
self.spherical_destination = target_sph
for k, v in target_sph.items():
if v == None:
target_sph[k] = current_sph[k]
if (self.working_mode == "remote") & (~simulate):
if not self.remote_allowed(current_sph, target_sph, frame_trsf=self.get_trsf(frame+".trsf"), tool_trsf=self.get_trsf(tool+".trsf"), motion="spherical"):
get_context().sendEvent("motion", "Requested remote motion is not in recorded trajectories. Use rob.record_motion(*args) to record the motion in manual working mode first.")
return False
## check if current point is on spherical trajectory (rx, ry, rz)
calculated_cartesian_starting_pos = self.sph2cart(**self.cart2sph(**current_cart))
is_spherical_starting_point = [abs(current_cart[k] - calculated_cartesian_starting_pos[k]) < 1. for k in ["x", "y", "z"]]
is_spherical_starting_point = is_spherical_starting_point + [abs(abs(abs(current_cart[k] - calculated_cartesian_starting_pos[k])-180)-180) < 1. for k in ["rx", "ry", "rz"]]
is_spherical_starting_point=all(is_spherical_starting_point)
if not is_spherical_starting_point:
print("Moving linear as starting point is not on spherical position")
target_cart = current_cart.copy()
target_cart.update(self.sph2cart(return_dict=True, **target_sph))
return self.move_cartesian(tool=tool, frame=frame, desc=desc, sync=sync, simulate=simulate, coordinates=coordinates, **target_cart)
# First only move in the radial direction
target_sph_linear = current_sph.copy()
target_sph_linear["r"] = target_sph["r"]
target_cart_linear = current_cart.copy()
tmp = self.sph2cart(return_dict=True, **target_sph_linear)
target_cart_linear.update({k:tmp[k] for k in ["x", "y", "z"]})
self.set_pnt([current_cart[k] for k in ["x", "y", "z", "rx", "ry", "rz"]], name="tcp_p_spherical[0]")
self.set_pnt([target_cart_linear[k] for k in ["x", "y", "z", "rx", "ry", "rz"]], name="tcp_p_spherical[1]")
if (self.point_reachable("tcp_p_spherical[1]"))&(self.distance_p("tcp_p_spherical[0]", "tcp_p_spherical[1]")>.1):
if simulate:
for i in range(11):
sim.append(self.get_movel_interpolation(i/10., coordinates=coordinates))
else:
self.set_motion_queue_empty(False)
ret = self.movel("tcp_p_spherical[1]", tool=tool, desc=None, sync=sync)
print("moving radius")
# Next move angles
target_cart = current_cart.copy()
target_cart.update(self.sph2cart(return_dict=True, **target_sph))
self.cartesian_destination = [target_cart[k] for k in ["x", "y", "z", "rx", "ry", "rz"]]
self.set_pnt([target_cart[k] for k in ["x", "y", "z", "rx", "ry", "rz"]], name="tcp_p_spherical[3]")
intermediate_cart = current_cart.copy()
intermediate_sph = target_sph.copy()
intermediate_sph.update({
"gamma": (target_sph["gamma"]+current_sph["gamma"])/2,
"delta": (target_sph["delta"]+current_sph["delta"])/2
})
intermediate_cart.update(self.sph2cart(return_dict=True, **intermediate_sph))
self.set_pnt([intermediate_cart[k] for k in ["x", "y", "z", "rx", "ry", "rz"]], name="tcp_p_spherical[2]")
distance = self.distance_p("tcp_p_spherical[1]", "tcp_p_spherical[3]")
if (self.point_reachable("tcp_p_spherical[3]"))&(distance>.1):
if (self.point_reachable("tcp_p_spherical[2]"))& (distance>1):
if simulate:
if len(sim)==0:
for i in range(11):
sim.append(self.get_movec_interpolation(i/10., coordinates=coordinates))
else:
for i in range(10):
sim.append(self.get_movec_interpolation(i/10.+0.1, coordinates=coordinates))
else:
ret = self.movec("tcp_p_spherical[2]", "tcp_p_spherical[3]", tool=None, desc=None, sync=sync)
print("moving angle circle")
else:
print("moving angle linear")
if simulate:
if len(sim)==0:
for i in range(11):
sim.append(self.get_movel_interpolation(i/10., coordinates=coordinates, idx_pnt1=1, idx_pnt2=3))
else:
for i in range(10):
sim.append(self.get_movel_interpolation(i/10.+0.1, coordinates=coordinates, idx_pnt1=1, idx_pnt2=3))
else:
self.set_motion_queue_empty(False)
ret = self.movel("tcp_p_spherical[3]", tool=None, desc=None, sync=sync)
if simulate:
return sim
def move_joint(self, j1=None, j2=None, j3=None, j4=None, j5=None, j6=None, tool=None, desc=None, sync=False, simulate=False, coordinates="joint"):
# Simulation not implemented yet
if desc == None:
desc = self.default_desc
if tool == None:
tool = self.tool()
target_jnt = {"j1": j1, "j2": j2, "j3": j3, "j4": j4, "j5": j5, "j6": j6}
current_jnt = self.get_joint_pos(return_dict=True)
for k, v in target_jnt.items():
if v == None:
target_jnt[k] = current_jnt[k]
if (self.working_mode == "remote") & (~simulate):
if not self.remote_allowed(current_jnt, target_jnt, frame_trsf=None, tool_trsf=None, motion="joint"):
get_context().sendEvent("motion", "Requested remote motion is not in recorded trajectories. Use rob.record_motion(*args) to record the motion in manual working mode first.")
return False
if simulate:
sim = []
cur_joint = [current_jnt[k] for k in ["j1", "j2", "j3", "j4", "j5", "j6"]]
delta_joint = [target_jnt[k]-current_jnt[k] for k in ["j1", "j2", "j3", "j4", "j5", "j6"]]
for i in range(11):
sim.append([j0+dj*i/10 for j0, dj in zip(cur_joint, delta_joint)])
else:
self.set_jnt([target_jnt[k] for k in ["j1", "j2", "j3", "j4", "j5", "j6"]], name="tcp_j")
self.set_motion_queue_empty(False)
self.movej("tcp_j", tool=tool , desc=desc, sync=sync)
if simulate:
return sim
def move_home(self):
if not self.is_in_point(P_HOME):
self.start_task(MOVE_HOME)
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_home()
def move_park(self):
if not self.is_in_point(P_PARK):
self.start_task(MOVE_PARK)
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
self.assert_park()
def tweak_x(self, offset):
self.start_task(TWEAK_X, offset)
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
def set_powered(self, value):
if value:
self.enable()
if not value:
self.disable()
def tweak_y(self, offset):
self.start_task(TWEAK_Y, offset)
self.wait_task_finished(TASK_WAIT_ROBOT_POLLING)
def on_event(self,ev):
#print "EVT: " + ev
pass
def on_task_finished(self, task, ret):
if self.isSimulated():
if ret is not None and ret <0:
if task==MOVE_HOME: self.simulated_point=P_HOME
elif task==MOVE_PARK: self.simulated_point=P_PARK
else: self.simulated_point=None
def doUpdate(self):
#start = time.time()
RobotTCP.doUpdate(self)
def start_task(self, program, *args, **kwargs):
#TODO: Check safe position
self.task_name = program
self.task_args = args
self.task_kwargs = kwargs
ret = RobotTCP.start_task(self, program, *args, **kwargs)
def stop_task(self):
RobotTCP.stop_task(self)
#TODO: Restore safe position
def set_remote_mode(self):
self.set_profile("remote")
def set_local(self):
self.set_profile("default")
def is_park(self):
return self.is_in_point(P_PARK)
def is_home(self):
return self.is_in_point(P_HOME)
def is_cleared(self):
return self.get_current_point() is not None
def assert_park(self):
self.assert_in_point(P_PARK)
def assert_home(self):
self.assert_in_point(P_HOME)
def assert_cleared(self):
if not self.is_cleared():
raise Exception("Robot not in cleared position")
def wait_ready(self):
self.waitState(State.Ready, 1000) #robot.state.assertReady()
def motor_to_epics_pvs(self, motors=[]):
self.epics_pvs = {str("SARES20-ROB:" + motor.name + ".VAL").upper(): CAS(str("SARES20-ROB:" + motor.name + ".VAL").upper(), motor, "double") for motor in motors}
self.epics_pvs.update({str("SARES20-ROB:" + motor.name + ".RBV").upper(): CAS(str("SARES20-ROB:" + motor.name + ".RBV").upper(), motor.readback, "double") for motor in motors})
if simulation:
add_device(RobotBernina("robot","localhost:1234"),force = True)
else:
add_device(RobotBernina("robot", "129.129.243.106:1234"), force = True)
time.sleep(0.1)
class DummySTP(RegisterBase):
def __init__(self, val=None):
self.val = val
self.initialize()
def doRead(self):
return self.val
def doWrite(self, val):
self.val=val
class MotorConfigPV(ReadonlyRegisterBase):
def __init__(self, Motor, fieldname):
self.motor = Motor
self.fieldname = fieldname
self.initialize()
def doRead(self):
return self.motor.config.getFieldValue(self.fieldname)
def getUnit(self):
return self.motor.getUnit()
def make_archivable(Motor):
base = PVBASE + Motor.name
pv_def = {
"": [Motor.readback, "double"],
".VAL": [Motor, "double"],
".RBV": [Motor.readback, "double"],
".EGU": [MotorConfigPV(Motor, "unit"), "string"],
".ADEL": [DummySTP(-1), "double"],
#".ADEL": [DummySTP(0.002), "double"],
".MDEL": [DummySTP(0), "double"],
".RTYP": [DummySTP("motor"), "string"],
".SCAN": [DummySTP("passive"), "string"],
".PINI": [DummySTP("No"), "string"],
".PHAS": [DummySTP(0), "short"],
".PREC": [DummySTP(3), "short"],
".EVNT": [DummySTP(""), "string"],
".PRIO": [DummySTP(0), "short"],#should be ENUM with 0,1,3 being LOW, MEDIUM, HIGH
".DISV": [DummySTP(1), "short"],
".DISA": [DummySTP(0), "short"],
".SDIS": [DummySTP(""), "string"],
".PROC": [DummySTP(0), "short"],#should be uchar
".DISS": [DummySTP(0), "short"],#should be ENUM with 0,1,2,3 being NO?ALARM, MINOR, MAJOR, INVALID
".LCNT": [DummySTP(0), "short"],#should be uchar
".PACT": [DummySTP(0), "short"],#should be uchar
".NAME": [DummySTP(base), "string"],
".DESC": [DummySTP(base), "string"],
".ASG": [DummySTP(""), "string"],
".TSE": [DummySTP(0), "short"],
".TSEL": [DummySTP(""), "string"],
".DTYP": [DummySTP("asynMotor"), "string"],
".DISP": [DummySTP(0), "double"],
".PUTF": [DummySTP(0), "double"],
".PPRO": [DummySTP(0), "double"],
".TPRO": [DummySTP(0), "double"],
".STAT": [DummySTP(0), "short"],
".SEVR": [DummySTP(0), "short"],
".NSTA": [DummySTP(0), "short"],
".NSEV": [DummySTP(0), "short"],
".ACKS": [DummySTP(2), "short"],
".ACKT": [DummySTP(0), "short"],
}
epics_pvs = {base + field: CAS((base+field).upper(), device, dtype) for field, [device, dtype] in pv_def.items()}
epics_pvs.update({base + ".FLNK": CAS((base+".FLNK").upper(), DummySTP((base+".RBV").upper()), "string")})
Motor.epics_pvs = epics_pvs
#robot.latency = 0.005
robot.set_default_desc(DESC_DEFAULT)
robot.default_speed = DEFAULT_SPEED
robot.set_frame(robot.frame_persistent())
robot.set_frame(robot.frame(), name="tcp_f_spherical", change_default=False)
robot.set_tool(robot.tool_persistent())
robot.setPolling(DEFAULT_ROBOT_POLLING)
robot.get_current_point() #TODO: REMOVE WHEN CURRENT POINT REPORTED BY POLLING MESSAGE
robot.set_motors_enabled(True)
robot.set_joint_motors_enabled(True)
motors = list(robot.cartesian_motors.values())+list(robot.spherical_motors.values())+list(robot.joint_motors.values())+list(robot.linear_axis_motors.values())
for m in motors:
make_archivable(m)
#robot.motor_to_epics_pvs()
#show_panel(robot)