from PShellClient import PShellClient import json import time import sys class TellClient(PShellClient): def __init__(self, url): PShellClient.__init__(self, url) self.start_sse_event_loop_task(["state", "shell"]) self.state = self.get_state() self.debug=False def on_event(self, name, value): if name == "state": self.state = value print ("State: ", value) elif name == "shell": if self.debug: print ("> ", value) def get_state(self): self.state = PShellClient.get_state(self) return self.state def wait_ready(self): count = 0 #Monitors event but polls every second just n case an event is missed while (True): if self.state != "Busy": break time.sleep(0.01) count = count + 1 if count>=100: count=0 self.get_state() if self.state != "Ready": raise Exception("Invalid state: " + str(self.state)) def set_in_mount_position(self, value): self.eval("in_mount_position = " + str(value) +"&") def is_in_mount_position(self): return self.eval("in_mount_position&").lower()=="true" def get_samples_info(self): return json.loads(self.eval("get_samples_info()&")) def set_samples_info(self, info): #c.run("data/set_samples_info", pars= [info,], background=True) self.eval("set_samples_info(" + json.dumps(info) + ")&") def start_cmd(self, cmd, *argv): cmd = cmd + "(" for a in argv: cmd = cmd + (("'" + a + "'") if type(a) is str else str(a) ) + ", " cmd = cmd + ")" ret = self.start_eval(cmd) self.get_state() return ret def wait_cmd(self, cmd): self.wait_ready() result = self.get_result(cmd) #print (result) if result["exception"] is not None: raise Exception(result["exception"] ) return result["return"] def mount(self, segment, puck, sample, force=False, read_dm=False, auto_unmount=False): #return self.run("motion/mount", pars= [segment,puck, sample, force, read_dm], background=True) return self.start_cmd("mount", segment, puck, sample, force, read_dm, auto_unmount) def unmount(self, segment = None, puck = None, sample = None, force=False): return self.start_cmd("unmount", segment, puck, sample, force) def scan_pin(self, segment, puck, sample, force=False): return self.start_cmd("scan_pin", segment, puck, sample, force) def scan_puck(self, segment, puck, force=False): return self.start_cmd("scan_puck", segment, puck, force) def dry(self, heat_time=30.0, speed=0.5, wait_cold = 30.0): return self.start_cmd("dry", heat_time, speed, wait_cold) def move_cold(self): return self.start_cmd("move_cold") def trash(self): return self.start_cmd("trash") def abort_cmd(self): self.abort() self.eval("robot.stop_task()&") def set_gonio_mount_position(homing = False): if homing: self.eval("home_fast_table()") self.eval("set_mount_position()") def get_mounted_sample(self): ret = self.eval("get_setting('mounted_sample_position')&").strip() return None if len(ret)==0 else ret def get_system_check(self): try: ret = self.eval("system_check()&") except Exception as ex: return ex return "Ok" def get_robot_state(self): return self.eval("robot.state&") def get_robot_status(self): status = self.eval("robot.take()&") return status def get_detected_pucks(self): return self.eval("get_detected_pucks()&") def set_pin_offset(self, value): self.eval("set_pin_offset(" + str(value)+ ")&") def get_pin_offset(self): return self.eval("get_pin_offset()&") def print_info(self): print ("State: " + str(self.get_state())) print ("Mounted sample: " + str(self.get_mounted_sample())) print ("System check: " + str(self.get_system_check())) print ("Robot state: " + str(self.get_robot_state())) print ("Robot status: " + str(self.get_robot_status())) print ("Detected pucks: " + str(self.get_detected_pucks())) print ("Pin offset: " + str(self.get_pin_offset())) print ("Mount position: " + str(self.is_in_mount_position())) print ("") if __name__ == "__main__": tell = TellClient("http://Alexandres-MBP.psi.ch:8080") tell.print_info() info = [ { "userName": "User", "dewarName": "Dewar", "puckName": "Puck", "puckBarcode": "XXX0001", "puckType": "Minispine", "puckAddress": "", "sampleName": "Sample", "sampleBarcode": "YYY0001", "samplePosition": "1", "sampleStatus": "Present", "sampleMountCount": "0" , }, ] print (tell.get_samples_info()) tell.set_samples_info(info) print (tell.get_samples_info()) tell.abort_cmd() cmd = tell.move_cold() print (tell.wait_cmd(cmd)) cmd = tell.trash() print (tell.wait_cmd(cmd)) cmd = tell.scan_pin("A", 1, 1) print (tell.wait_cmd(cmd)) cmd = tell.scan_puck("A", 1, 1) print (tell.wait_cmd(cmd)) cmd = tell.mount("A", 1, 1) print (tell.wait_cmd(cmd)) print ("Mounted sample: " + str(tell.get_mounted_sample())) cmd = tell.unmount() print (tell.wait_cmd(cmd)) print ("Mounted sample: " + str(tell.get_mounted_sample()))