166 lines
5.5 KiB
Python
166 lines
5.5 KiB
Python
import ch.psi.pshell.device.DiscretePositionerBase as DiscretePositionerBase
|
|
import requests
|
|
import json
|
|
|
|
class Hexiposi(DiscretePositionerBase):
|
|
def __init__(self, name, url):
|
|
DiscretePositionerBase.__init__(self, name, ["A","B","C","D","E","F"])
|
|
self.PORT_SET=8002
|
|
self.PORT_GET=8002
|
|
if not url.startswith("http://"):
|
|
url = "http://" + url
|
|
if not url.endswith(":"):
|
|
url = url + ":"
|
|
self.url_set = url + str (self.PORT_SET)+ "/hexiposi/"
|
|
self.url_get = url + str (self.PORT_GET)+ "/hexiposi/"
|
|
self.moved = True
|
|
self.homing_state = State.Disabled
|
|
self.rback = self.UNKNOWN_POSITION
|
|
self.timeout = 3.0
|
|
self.offline = False
|
|
|
|
def doInitialize(self):
|
|
super(Hexiposi, self).doInitialize()
|
|
self.val = self.doReadReadback()
|
|
|
|
def get_response(self, response):
|
|
if (response.status_code!=200):
|
|
raise Exception (response.text)
|
|
return json.loads(response.text)
|
|
|
|
def get_status(self):
|
|
try:
|
|
self.status = self.get_response(requests.get(url=self.url_get+"get", timeout=self.timeout))
|
|
self.estop = self.status["estop"]
|
|
self.homed = self.status["homed"]
|
|
self.error = self.status["errorCode"]
|
|
self.remote = self.status["mode"] == "remote"
|
|
self.moving = self.status["errorCode"]
|
|
self.pos = self.status["position"]
|
|
self.moving = self.status["moving"]
|
|
self.offset = self.status["offset"]
|
|
self.dpos = self.status["discretePosition"]
|
|
if (self.homed==False): rback = self.UNKNOWN_POSITION
|
|
elif self.dpos == 1: rback = "A"
|
|
elif self.dpos == 2: rback = "B"
|
|
elif self.dpos == 4: rback = "C"
|
|
elif self.dpos == 8: rback = "D"
|
|
elif self.dpos == 16: rback = "E"
|
|
elif self.dpos == 32: rback = "F"
|
|
else: rback = self.UNKNOWN_POSITION
|
|
if (rback == self.UNKNOWN_POSITION) or (rback != self.rback):
|
|
self.moved = True
|
|
self.rback = rback
|
|
self.offline = False
|
|
return self.status
|
|
except:
|
|
self.offline = True
|
|
self.updateState()
|
|
raise
|
|
|
|
def set_deadband(self, value = 0.1): #degrees
|
|
ret = self.get_response(requests.get(url=self.url_set+"setDeadband?deadband=" + str(value), timeout=self.timeout))
|
|
if ret["deadbandOutput"] == value:
|
|
return value
|
|
raise Excepiton("Error setting deadband: " + str(ret))
|
|
|
|
def move_pos(self, pos):
|
|
return self.get_response(requests.get(url=self.url_set+"set?pos=" + str(pos), timeout=self.timeout))
|
|
|
|
def move_home(self):
|
|
ret = self.get_response(requests.get(url=self.url_set+"set?home=1", timeout=self.timeout))
|
|
try:
|
|
self.waitState(self.homing_state,1200)
|
|
except:
|
|
pass
|
|
return ret
|
|
|
|
def stop_move(self):
|
|
return self.get_response(requests.get(url=self.url_set+"set?stop=1", timeout=self.timeout))
|
|
|
|
def set_offset(self, offset):
|
|
return self.get_response(requests.get(url=self.url_set+"setOffset?offset="+str(offset), timeout=self.timeout))
|
|
|
|
def doUpdate(self):
|
|
self.get_status()
|
|
super(Hexiposi, self).doUpdate()
|
|
|
|
|
|
def doStop(self):
|
|
self.stop_move()
|
|
|
|
def doRead(self):
|
|
return str(self.val)
|
|
|
|
def doReadReadback(self):
|
|
self.get_status()
|
|
return self.rback
|
|
|
|
def doWrite(self, val):
|
|
val = ord(val) - ord('A') +1
|
|
if val<1 or val>6:
|
|
raise Exception("Invalid value: " + str(val))
|
|
moving = val != self.val
|
|
self.val = val
|
|
self.move_pos(self.val)
|
|
#Workaround as state does not changes immediatelly
|
|
if moving:
|
|
#try:
|
|
# self.waitState(State.Busy,1200)
|
|
#except:
|
|
# print sys.exc_info()[1]
|
|
start = time.time()
|
|
while self.state != State.Busy:
|
|
if time.time() - start > 1.5:
|
|
print "Timeout waiting Hexiposi busy"
|
|
break
|
|
self.update()
|
|
|
|
def is_in_position(self, pos):
|
|
return take() == pos
|
|
|
|
|
|
def assert_in_position(self, pos):
|
|
if not is_in_position(pos):
|
|
raise Exception ("Hexiposi is not in position")
|
|
|
|
def assert_homed(self):
|
|
if self.homed == False:
|
|
raise Exception ("Hexiposi is not homed")
|
|
|
|
|
|
#def isReady(self):
|
|
# self.get_status()
|
|
# return self.moving == False
|
|
|
|
def updateState(self):
|
|
if self.offline:
|
|
self.setState(State.Offline)
|
|
elif self.homed == False:
|
|
self.setState(self.homing_state)
|
|
elif self.moving:
|
|
self.setState(State.Busy)
|
|
else:
|
|
self.setState(State.Ready)
|
|
|
|
|
|
|
|
#http://myriotell:8003/hexiposi/get
|
|
dev = Hexiposi("hexiposi", "myriotell")
|
|
|
|
add_device(dev, True)
|
|
hexiposi.polling=1000
|
|
#print dev.url
|
|
#print dev.get_status()
|
|
|
|
class hexiposi_position(ReadonlyRegisterBase):
|
|
def doRead(self):
|
|
try:
|
|
return float(hexiposi.pos)
|
|
except:
|
|
return float("nan")
|
|
|
|
add_device(hexiposi_position(), True)
|
|
hexiposi_position.polling = 1000
|
|
hexiposi.set_deadband(0.5)
|