Files
sics/site_ansto/instrument/TEST_SICS/fakeGalil/galilcontroller.py
2013-02-21 14:26:26 +11:00

216 lines
7.8 KiB
Python

#!/usr//bin/env python
# vim: ts=8 sts=4 sw=4 expandtab
# Author: Douglas Clowes (dcl@ansto.gov.au) 2012-06-29
import string
import time
import re
from galilmotor import GalilMotor
class GalilController(object):
"""Galil Controller motor controller object"""
def __init__(self, thePort):
print "GalilController ctor: %d" % int(thePort)
self.port = thePort
self.motors = {}
for letter in "ABCDEFGH":
self.motors[letter] = GalilMotor(letter)
self.reset_powerup()
def reset_powerup(self):
print self.__class__.__name__, "reset_powerup"
self.variables = {}
def write(self, response):
#print "Device Response: %s" % response
self.response.append(response)
def doIteration(self, now):
#print "DoIteration"
# pass to motors
for letter in "ABCDEFGH":
self.motors[letter].doIteration(now)
def doAssignment(self, cmd, arg):
action = cmd[0:2]
axis = cmd[2:]
#print "Assignment %s(%s) = %s" % (action, axis, arg)
if action in ["PA", "SP", "AC", "DC"]:
if len(axis) == 1 and axis in "ABCDEFGH":
self.motors[axis].doAssignment(action, arg)
return
self.variables[cmd] = arg
def doCommandMG(self, cmd, arg):
#print cmd, arg
if (arg[0] == "\"" or arg[1] == "'") and arg[0] == arg[-1]:
arg = arg[1:-1]
if re.match("^CONFIG[A-H]=", arg):
the_axis = arg[6]
the_config = arg[8:]
self.motors[the_axis].doConfigure(the_config)
self.write(arg)
return
if re.match("^TRACE[A-H]=", arg):
the_axis = arg[5]
the_config = arg[7:]
self.motors[the_axis].doTrace(the_config)
self.write(arg)
return
if re.match("^DROP", arg):
self.protocol.transport.loseConnection();
return
# TODO replace this simple stuff with more complex
if re.match("^{[^}]+}", arg):
the_match = re.match("^({[^}]+})(.*)$", arg)
the_format = the_match.group(1)
the_rest = string.strip(the_match.group(2))
arg = the_rest
response = []
for item in arg.split(","):
item = string.strip(item)
#print "Item:", item
if item == "_TI0":
response.append(" 0000000000")
elif item == "_TI1":
response.append(" 0000000000")
elif item == "_XQ0":
response.append(" 0000000123")
elif len(item) == 4 and item[0] == "_":
axis = arg[3]
param = self.motors[axis].doQuery(item[1:3])
if param < 0:
paramstr = "-" + ("%010i" % abs(param))
else:
paramstr = " " + ("%010i" % param)
response.append(paramstr)
result = string.join(response, "")
#print result
self.write(result)
def doMotorCommand(self, cmd, arg):
# TODO replace this simple stuff with more complex
action = cmd[0:2]
axes = cmd[2:] + arg
for axis in axes:
self.motors[axis].doCommand(action, None)
def doCommandLV(self, cmd, arg):
code = "SIMULATE"
self.variables[code] = "1"
keys = self.variables.keys()
keys.sort()
for key in keys:
self.write("%s= %s" % (key, self.variables[key]))
del self.variables[code]
def doCommandDA(self, cmd, arg):
key = cmd[3:]
if key in self.variables.keys():
del self.variables[key]
def doCommand(self, cmd, arg):
action = cmd[0:2]
#print "Command %s(%s) = %s" % (action, cmd[2:], arg)
if action in ["BG", "ST", "TP", "TD", "SH", "MO"]:
self.doMotorCommand(cmd, arg)
if action == "LV":
self.doCommandLV(cmd, arg)
return
elif action == "DA":
self.doCommandDA(cmd, arg)
elif action == "MG":
self.doCommandMG(cmd, arg)
def processDataReceived(self, data):
#print "PDU: \"" + data + "\""
self.response = []
the_match = re.match("^([A-Z0-9_*]+)(.*)$", data)
if the_match and the_match.group(2)[:1] == "=":
cmd = the_match.group(1)
arg = the_match.group(2)[1:]
self.doAssignment(cmd, arg)
elif re.match("([A-Z][A-Z]) *(.*)$", data):
match = re.match("([A-Z][A-Z]) *(.*)$", data)
cmd = match.group(1)
arg = match.group(2)
self.doCommand(cmd, arg)
else:
print "Command not recognized:", data
cmd = "Command"
arg = "Unknown"
# TODO replace this simple stuff with more complex
#print "OUT: \"" + self.response + "\""
return self.response
def dataReceived(self, data):
self.doIteration(time.time())
return self.processDataReceived(data)
if __name__ == '__main__':
totalDistance = 100000
mc = GalilController(0)
print "LV", mc.processDataReceived("LV")
print "MG", mc.processDataReceived("MG \"TRACEA=ON\"")
print "MG", mc.processDataReceived("MG \"CONFIGA=SPX=25000.0,CPX=8192.0,RL=-10.0,FL=10.0,UH=0.0,EH=100000000\"")
print "SP", mc.processDataReceived("SPA=25000")
print "AC", mc.processDataReceived("ACA=25000")
print "DC", mc.processDataReceived("DCA=25000")
print "PA", mc.processDataReceived("PAA=%d" % totalDistance)
start_position = int(mc.processDataReceived("MG _TDA")[0])
print "SH", mc.processDataReceived("SHA")
print "BG", mc.processDataReceived("BGA")
startTime = time.time()
currentDistance = 0
print "MO", mc.processDataReceived("MG _MOA")
print "BG", mc.processDataReceived("MG _BGA")
count = 0
#previousTime = 0
while int(mc.processDataReceived("MG _BGA")[0]) == 1 and count <= 100:
t = 0.1 * count
mc.doIteration(mc.motors["A"].motorStartTime + t)
rnge = 70
temp = currentDistance
currentDistance = abs(int(mc.processDataReceived("MG _TDA")[0]) - start_position)
#percentDistance = 100.0 * currentDistance / totalDistance
#percentVelocity = 100.0 * (currentDistance - temp) / (t - previousTime) / mc.processDataReceived("MG _SPA")[0]
#previousTime = t
percentDistance = mc.motors["A"].percentDistance
percentVelocity = mc.motors["A"].percentVelocity
vspot = int(0.01 * rnge * percentVelocity)
sspot = int(0.01 * rnge * percentDistance)
bth = "b"
if sspot >= 0:
ess = "s"
else:
ess = "*"
bth = "B"
sspot = 0
if vspot >= 0:
vee = "v"
else:
vee = "^"
bth = "B"
vspot = 0
if vspot < sspot:
line = " "*(vspot) + vee + " "*(sspot - vspot - 1) + ess + " "*(rnge - sspot) + "|"
line += " %2d %2d" % (vspot, sspot)
elif vspot > sspot:
line = " "*(sspot) + ess + " "*(vspot - sspot - 1) + vee + " "*(rnge - vspot) + "|"
line += " %2d %2d" % (sspot, vspot)
else:
line = " "*(sspot) + bth + " "*(rnge - sspot) + "|"
line += " %2d" % (sspot)
print "%3.2f: %s" % (t, line)
count += 1
print "_MO", mc.processDataReceived("MG _MOA")
print "_BG", mc.processDataReceived("MG _BGA")
print "_TP", mc.processDataReceived("MG _TPA")
print "_TD", mc.processDataReceived("MG _TDA")
print "MO", mc.processDataReceived("MOA")
print "_MO", mc.processDataReceived("MG _MOA")
print "ZZ", mc.processDataReceived("ZZA")
print "_ZZ", mc.processDataReceived("MG _ZZA")
print "MG", mc.processDataReceived("MG _MOA, _BGA, _TDA, _TPA, _SPA, _ACA, _DCA")
print "Iterations:", count