91 lines
2.6 KiB
Python
91 lines
2.6 KiB
Python
from PShellClient import PShellClient
|
|
import json
|
|
import time
|
|
import sys
|
|
|
|
|
|
|
|
class BerninaRobotClient(PShellClient):
|
|
def __init__(self, url):
|
|
PShellClient.__init__(self, url)
|
|
self.start_sse_event_loop_task(["state", "shell"])
|
|
self.state = self.get_state()
|
|
self.debug=False
|
|
|
|
def on_event(self, name, value):
|
|
if name == "state":
|
|
self.state = value
|
|
print ("State: ", value)
|
|
elif name == "shell":
|
|
if self.debug:
|
|
print ("> ", value)
|
|
|
|
def get_state(self):
|
|
self.state = PShellClient.get_state(self)
|
|
return self.state
|
|
|
|
def wait_ready(self):
|
|
count = 0
|
|
#Monitors event but polls every second just n case an event is missed
|
|
while (True):
|
|
if self.state != "Busy":
|
|
break
|
|
time.sleep(0.01)
|
|
count = count + 1
|
|
if count>=100:
|
|
count=0
|
|
self.get_state()
|
|
if self.state != "Ready":
|
|
raise Exception("Invalid state: " + str(self.state))
|
|
|
|
|
|
def start_cmd(self, cmd, *argv):
|
|
cmd = cmd + "("
|
|
for a in argv:
|
|
cmd = cmd + (("'" + a + "'") if type(a) is str else str(a) ) + ", "
|
|
cmd = cmd + ")"
|
|
ret = self.start_eval(cmd)
|
|
self.get_state()
|
|
return ret
|
|
|
|
def wait_cmd(self, cmd):
|
|
self.wait_ready()
|
|
result = self.get_result(cmd)
|
|
#print (result)
|
|
if result["exception"] is not None:
|
|
raise Exception(result["exception"] )
|
|
return result["return"]
|
|
|
|
def abort_cmd(self):
|
|
self.abort()
|
|
self.eval("robot.stop_task()&")
|
|
|
|
def get_robot_state(self):
|
|
return self.eval("robot.state&")
|
|
|
|
|
|
def get_robot_status(self):
|
|
status = self.eval("robot.take()&")
|
|
return status
|
|
|
|
def print_info(self):
|
|
print ("State: " + str(self.get_state()))
|
|
print ("Robot state: " + str(self.get_robot_state()))
|
|
print ("Robot status: " + str(self.get_robot_status()))
|
|
print ("")
|
|
|
|
|
|
#High-level commands
|
|
def move_park(self, wait=True):
|
|
cmd = self.start_cmd("robot.move_park",)
|
|
return self.wait_cmd(cmd) if wait else cmd
|
|
|
|
def move_home(self, wait=True):
|
|
cmd = self.start_cmd("robot.move_home")
|
|
return self.wait_cmd(cmd) if wait else cmd
|
|
|
|
def tweak_x(self, offset, wait=True):
|
|
cmd = self.start_cmd("robot.tweak_x", offset)
|
|
return self.wait_cmd(cmd) if wait else cmd
|
|
|