from PShellClient import PShellClient import json import time import sys class BerninaRobotClient(PShellClient): def __init__(self, url): PShellClient.__init__(self, url) self.start_sse_event_loop_task(["state", "shell"]) self.state = self.get_state() self.debug=False def on_event(self, name, value): if name == "state": self.state = value print ("State: ", value) elif name == "shell": if self.debug: print ("> ", value) def get_state(self): self.state = PShellClient.get_state(self) return self.state def wait_ready(self): count = 0 #Monitors event but polls every second just n case an event is missed while (True): if self.state != "Busy": break time.sleep(0.01) count = count + 1 if count>=100: count=0 self.get_state() if self.state != "Ready": raise Exception("Invalid state: " + str(self.state)) def start_cmd(self, cmd, *argv): cmd = cmd + "(" for a in argv: cmd = cmd + (("'" + a + "'") if type(a) is str else str(a) ) + ", " cmd = cmd + ")" ret = self.start_eval(cmd) self.get_state() return ret def wait_cmd(self, cmd): self.wait_ready() result = self.get_result(cmd) #print (result) if result["exception"] is not None: raise Exception(result["exception"] ) return result["return"] def abort_cmd(self): self.abort() self.eval("robot.stop_task()&") def get_robot_state(self): return self.eval("robot.state&") def get_robot_status(self): status = self.eval("robot.take()&") return status def print_info(self): print ("State: " + str(self.get_state())) print ("Robot state: " + str(self.get_robot_state())) print ("Robot status: " + str(self.get_robot_status())) print ("") #High-level commands def move_park(self, wait=True): cmd = self.start_cmd("robot.move_park",) return self.wait_cmd(cmd) if wait else cmd def move_home(self, wait=True): cmd = self.start_cmd("robot.move_home") return self.wait_cmd(cmd) if wait else cmd def tweak_x(self, offset, wait=True): cmd = self.start_cmd("robot.tweak_x", offset) return self.wait_cmd(cmd) if wait else cmd