120 lines
3.7 KiB
Python
120 lines
3.7 KiB
Python
import ch.psi.pshell.device.DiscretePositionerBase as DiscretePositionerBase
|
|
import requests
|
|
import json
|
|
|
|
class Hexiposi(DiscretePositionerBase):
|
|
def __init__(self, name, url):
|
|
DiscretePositionerBase.__init__(self, name, ["A","B","C","D","E","F"])
|
|
if not url.startswith("http://"):
|
|
url = "http://" + url
|
|
if not url.endswith("/"):
|
|
url = url + "/"
|
|
self.url = url
|
|
|
|
def doInitialize(self):
|
|
super(Hexiposi, self).doInitialize()
|
|
self.val = self.doReadReadback()
|
|
|
|
def get_response(self, response):
|
|
if (response.status_code!=200):
|
|
raise Exception (response.text)
|
|
return json.loads(response.text)
|
|
|
|
def get_status(self):
|
|
self.status = self.get_response(requests.get(url=self.url+"get"))
|
|
self.estop = self.status["estop"]
|
|
self.homed = self.status["homed"]
|
|
self.error = self.status["errorCode"]
|
|
self.remote = self.status["mode"] == "remote"
|
|
self.moving = self.status["errorCode"]
|
|
self.pos = self.status["position"]
|
|
self.moving = self.status["moving"]
|
|
self.offset = self.status["offset"]
|
|
self.dpos = self.status["discretePosition"]
|
|
if (self.homed==False): self.rback = self.UNKNOWN_POSITION
|
|
elif self.dpos == 1: self.rback = "A"
|
|
elif self.dpos == 2: self.rback = "B"
|
|
elif self.dpos == 4: self.rback = "C"
|
|
elif self.dpos == 8: self.rback = "D"
|
|
elif self.dpos == 16: self.rback = "E"
|
|
elif self.dpos == 32: self.rback = "F"
|
|
else: self.rback = self.UNKNOWN_POSITION
|
|
return self.status
|
|
|
|
def move_pos(self, pos):
|
|
return self.get_response(requests.get(url=self.url+"set?pos=" + str(pos)))
|
|
|
|
def move_home(self):
|
|
return self.get_response(requests.get(url=self.url+"set?home=1"))
|
|
|
|
def stop_move(self):
|
|
return self.get_response(requests.get(url=self.url+"set?stop=1"))
|
|
|
|
def set_offset(self, offset):
|
|
return self.get_response(requests.get(url=self.url+"setOffset?offset="+str(offset)))
|
|
|
|
def doUpdate(self):
|
|
self.get_status()
|
|
super(Hexiposi, self).doUpdate()
|
|
|
|
|
|
def doStop(self):
|
|
self.stop_move()
|
|
|
|
def doRead(self):
|
|
return str(self.val)
|
|
|
|
def doReadReadback(self):
|
|
self.get_status()
|
|
return self.rback
|
|
|
|
def doWrite(self, val):
|
|
val = ord(val) - ord('A') +1
|
|
if val<1 or val>6:
|
|
raise Exception("Invalid value: " + str(val))
|
|
moving = val != self.val
|
|
self.val = val
|
|
self.move_pos(self.val)
|
|
#Workaround as state does not changes immediatelly
|
|
if moving:
|
|
try:
|
|
self.waitState(State.Busy,1200)
|
|
except:
|
|
pass
|
|
|
|
def assert_in_position(self, pos):
|
|
return take() == pos
|
|
|
|
#def isReady(self):
|
|
# self.get_status()
|
|
# return self.moving == False
|
|
|
|
def updateState(self):
|
|
if self.homed == False:
|
|
self.setState(State.Disabled)
|
|
elif self.moving:
|
|
self.setState(State.Busy)
|
|
else:
|
|
self.setState(State.Ready)
|
|
|
|
|
|
|
|
#http://129.129.110.83:8002/hexiposi/get
|
|
dev = Hexiposi("hexiposi", "myriotell:8002/hexiposi")
|
|
|
|
add_device(dev, True)
|
|
hexiposi.polling=500
|
|
#print dev.url
|
|
#print dev.get_status()
|
|
|
|
class hexiposi_position(ReadonlyRegisterBase):
|
|
def doRead(self):
|
|
try:
|
|
return float(hexiposi.pos)
|
|
except:
|
|
return float("nan")
|
|
|
|
add_device(hexiposi_position(), True)
|
|
hexiposi_position.polling = 1000
|
|
|