Addpython motor simulation for taipan

This commit is contained in:
Douglas Clowes
2013-02-21 14:26:26 +11:00
parent e7a478489e
commit a00ca3872f
6 changed files with 904 additions and 0 deletions

View File

@@ -0,0 +1,91 @@
#!/usr//bin/env python
# vim: ts=8 sts=4 sw=4 expandtab
# Author: Douglas Clowes (dcl@ansto.gov.au) 2012-06-29
from twisted.internet import reactor
from twisted.python import log
from twisted.internet.task import LoopingCall
import random
import re
import sys
import time
import inspect
from galilfactory import GalilFactory
from counterclient import CounterClientFactory
from powdersample import CubicPowderSample
log.startLogging(sys.stdout)
beam_monitor_counter = None
detector_counter = None
beam_monitor_rate = None
nickel = CubicPowderSample()
def stopCallbackM1M2(motor):
global nickel
global beam_monitor_rate
mu = m2.getPosition() / 2.0
nickel.set_wavelength_from_theta_pg002(mu)
x = m1.getPosition()
y = nickel.funx(abs(x), abs(mu), 0.2)
baseline = 1000
height = int(5000 * y) + baseline
beam_monitor_rate = height
print "My Height at", x, "is", height
if beam_monitor_counter:
beam_monitor_counter.setCountRate(height)
if detector_counter:
stopCallbackS2(m1)
print "Motor M1, M2 at:", m1.getPosition(), m2.getPosition(), ", wavelength=", nickel.wavelength, ", mEv=", nickel.mEv
def stopCallbackS2(motor):
global nickel
global beam_monitor_rate
if beam_monitor_rate == None:
m1.stopCallback(m1)
x = s2.getPosition()
y = nickel.calc(abs(x))
baseline = 2000
height = int(beam_monitor_rate * y) + baseline
print "My Height at", x, "is", height
if detector_counter:
detector_counter.setCountRate(height)
print "Motor S2 at:", s2.getPosition()
def device_iterator():
now = time.time()
m1.doIteration(now)
m2.doIteration(now)
s2.doIteration(now)
basePort = 62034 # Echidna
basePort = 62134 # Wombat
basePort = 62430 # Quokka
basePort = 62730 # Taipan
devices = {}
for dev in range(0, 6):
port = basePort + dev
controllerName = "mc%d" % (dev + 1)
factory = GalilFactory(port)
devices[controllerName] = factory.device
reactor.listenTCP(port, factory)
s2 = devices["mc2"].motors["F"]
s2.moveCallback = s2.stopCallback = stopCallbackS2
m1 = devices["mc3"].motors["E"]
m1.moveCallback = m1.stopCallback = stopCallbackM1M2
m2 = devices["mc1"].motors["F"]
m2.moveCallback = m2.stopCallback = stopCallbackM1M2
m1.stopCallback(m1)
m2.stopCallback(m2)
s2.stopCallback(s2)
beam_monitor_counter = CounterClientFactory()
detector_counter = CounterClientFactory()
connector1 = reactor.connectTCP("localhost", 33001, beam_monitor_counter)
connector2 = reactor.connectTCP("localhost", 33000, detector_counter)
print "Connector1:", connector1.getDestination()
print "Connector2:", connector2.getDestination()
lc = LoopingCall(device_iterator)
lc.start(0.05)
reactor.run()

View File

@@ -0,0 +1,52 @@
#!/usr//bin/env python
# vim: ts=8 sts=4 sw=4 expandtab
# Author: Douglas Clowes (dcl@ansto.gov.au) 2012-06-29
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import ReconnectingClientFactory
myCounters = {}
class Counter(Protocol):
def dataReceived(self, data):
print "Data:", data
def connectionMade(self):
print "connectionMade", self.transport.getHost(), self.transport.getPeer()
def connectionLost(self, reason):
print "connectionLost"
def sendMessage(self, m):
self.transport.write(m)
class CounterClientFactory(ReconnectingClientFactory):
def __init__(self):
self.device = None
def startedConnecting(self, connector):
print 'Started to connect.'
def buildProtocol(self, addr):
global myCounters
print 'Connected to:', addr
self.resetDelay()
p = Counter()
myCounters[addr.port] = p
self.device = p
print "My Counters", myCounters
return p
def clientConnectionLost(self, connector, reason):
global myCounters
print 'Lost connection', connector.getDestination().port,'Reason:', reason
del myCounters[connector.getDestination().port]
self.device = None
ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
def clientConnectionFailed(self, connector, reason):
print 'Connection failed. Reason:', reason
ReconnectingClientFactory.clientConnectionFailed(self, connector,reason)
def setCountRate(self, rate):
if self.device:
self.device.sendMessage("SET SOURCE=%d\r\n" % rate)

View File

@@ -0,0 +1,215 @@
#!/usr//bin/env python
# vim: ts=8 sts=4 sw=4 expandtab
# Author: Douglas Clowes (dcl@ansto.gov.au) 2012-06-29
import string
import time
import re
from galilmotor import GalilMotor
class GalilController(object):
"""Galil Controller motor controller object"""
def __init__(self, thePort):
print "GalilController ctor: %d" % int(thePort)
self.port = thePort
self.motors = {}
for letter in "ABCDEFGH":
self.motors[letter] = GalilMotor(letter)
self.reset_powerup()
def reset_powerup(self):
print self.__class__.__name__, "reset_powerup"
self.variables = {}
def write(self, response):
#print "Device Response: %s" % response
self.response.append(response)
def doIteration(self, now):
#print "DoIteration"
# pass to motors
for letter in "ABCDEFGH":
self.motors[letter].doIteration(now)
def doAssignment(self, cmd, arg):
action = cmd[0:2]
axis = cmd[2:]
#print "Assignment %s(%s) = %s" % (action, axis, arg)
if action in ["PA", "SP", "AC", "DC"]:
if len(axis) == 1 and axis in "ABCDEFGH":
self.motors[axis].doAssignment(action, arg)
return
self.variables[cmd] = arg
def doCommandMG(self, cmd, arg):
#print cmd, arg
if (arg[0] == "\"" or arg[1] == "'") and arg[0] == arg[-1]:
arg = arg[1:-1]
if re.match("^CONFIG[A-H]=", arg):
the_axis = arg[6]
the_config = arg[8:]
self.motors[the_axis].doConfigure(the_config)
self.write(arg)
return
if re.match("^TRACE[A-H]=", arg):
the_axis = arg[5]
the_config = arg[7:]
self.motors[the_axis].doTrace(the_config)
self.write(arg)
return
if re.match("^DROP", arg):
self.protocol.transport.loseConnection();
return
# TODO replace this simple stuff with more complex
if re.match("^{[^}]+}", arg):
the_match = re.match("^({[^}]+})(.*)$", arg)
the_format = the_match.group(1)
the_rest = string.strip(the_match.group(2))
arg = the_rest
response = []
for item in arg.split(","):
item = string.strip(item)
#print "Item:", item
if item == "_TI0":
response.append(" 0000000000")
elif item == "_TI1":
response.append(" 0000000000")
elif item == "_XQ0":
response.append(" 0000000123")
elif len(item) == 4 and item[0] == "_":
axis = arg[3]
param = self.motors[axis].doQuery(item[1:3])
if param < 0:
paramstr = "-" + ("%010i" % abs(param))
else:
paramstr = " " + ("%010i" % param)
response.append(paramstr)
result = string.join(response, "")
#print result
self.write(result)
def doMotorCommand(self, cmd, arg):
# TODO replace this simple stuff with more complex
action = cmd[0:2]
axes = cmd[2:] + arg
for axis in axes:
self.motors[axis].doCommand(action, None)
def doCommandLV(self, cmd, arg):
code = "SIMULATE"
self.variables[code] = "1"
keys = self.variables.keys()
keys.sort()
for key in keys:
self.write("%s= %s" % (key, self.variables[key]))
del self.variables[code]
def doCommandDA(self, cmd, arg):
key = cmd[3:]
if key in self.variables.keys():
del self.variables[key]
def doCommand(self, cmd, arg):
action = cmd[0:2]
#print "Command %s(%s) = %s" % (action, cmd[2:], arg)
if action in ["BG", "ST", "TP", "TD", "SH", "MO"]:
self.doMotorCommand(cmd, arg)
if action == "LV":
self.doCommandLV(cmd, arg)
return
elif action == "DA":
self.doCommandDA(cmd, arg)
elif action == "MG":
self.doCommandMG(cmd, arg)
def processDataReceived(self, data):
#print "PDU: \"" + data + "\""
self.response = []
the_match = re.match("^([A-Z0-9_*]+)(.*)$", data)
if the_match and the_match.group(2)[:1] == "=":
cmd = the_match.group(1)
arg = the_match.group(2)[1:]
self.doAssignment(cmd, arg)
elif re.match("([A-Z][A-Z]) *(.*)$", data):
match = re.match("([A-Z][A-Z]) *(.*)$", data)
cmd = match.group(1)
arg = match.group(2)
self.doCommand(cmd, arg)
else:
print "Command not recognized:", data
cmd = "Command"
arg = "Unknown"
# TODO replace this simple stuff with more complex
#print "OUT: \"" + self.response + "\""
return self.response
def dataReceived(self, data):
self.doIteration(time.time())
return self.processDataReceived(data)
if __name__ == '__main__':
totalDistance = 100000
mc = GalilController(0)
print "LV", mc.processDataReceived("LV")
print "MG", mc.processDataReceived("MG \"TRACEA=ON\"")
print "MG", mc.processDataReceived("MG \"CONFIGA=SPX=25000.0,CPX=8192.0,RL=-10.0,FL=10.0,UH=0.0,EH=100000000\"")
print "SP", mc.processDataReceived("SPA=25000")
print "AC", mc.processDataReceived("ACA=25000")
print "DC", mc.processDataReceived("DCA=25000")
print "PA", mc.processDataReceived("PAA=%d" % totalDistance)
start_position = int(mc.processDataReceived("MG _TDA")[0])
print "SH", mc.processDataReceived("SHA")
print "BG", mc.processDataReceived("BGA")
startTime = time.time()
currentDistance = 0
print "MO", mc.processDataReceived("MG _MOA")
print "BG", mc.processDataReceived("MG _BGA")
count = 0
#previousTime = 0
while int(mc.processDataReceived("MG _BGA")[0]) == 1 and count <= 100:
t = 0.1 * count
mc.doIteration(mc.motors["A"].motorStartTime + t)
rnge = 70
temp = currentDistance
currentDistance = abs(int(mc.processDataReceived("MG _TDA")[0]) - start_position)
#percentDistance = 100.0 * currentDistance / totalDistance
#percentVelocity = 100.0 * (currentDistance - temp) / (t - previousTime) / mc.processDataReceived("MG _SPA")[0]
#previousTime = t
percentDistance = mc.motors["A"].percentDistance
percentVelocity = mc.motors["A"].percentVelocity
vspot = int(0.01 * rnge * percentVelocity)
sspot = int(0.01 * rnge * percentDistance)
bth = "b"
if sspot >= 0:
ess = "s"
else:
ess = "*"
bth = "B"
sspot = 0
if vspot >= 0:
vee = "v"
else:
vee = "^"
bth = "B"
vspot = 0
if vspot < sspot:
line = " "*(vspot) + vee + " "*(sspot - vspot - 1) + ess + " "*(rnge - sspot) + "|"
line += " %2d %2d" % (vspot, sspot)
elif vspot > sspot:
line = " "*(sspot) + ess + " "*(vspot - sspot - 1) + vee + " "*(rnge - vspot) + "|"
line += " %2d %2d" % (sspot, vspot)
else:
line = " "*(sspot) + bth + " "*(rnge - sspot) + "|"
line += " %2d" % (sspot)
print "%3.2f: %s" % (t, line)
count += 1
print "_MO", mc.processDataReceived("MG _MOA")
print "_BG", mc.processDataReceived("MG _BGA")
print "_TP", mc.processDataReceived("MG _TPA")
print "_TD", mc.processDataReceived("MG _TDA")
print "MO", mc.processDataReceived("MOA")
print "_MO", mc.processDataReceived("MG _MOA")
print "ZZ", mc.processDataReceived("ZZA")
print "_ZZ", mc.processDataReceived("MG _ZZA")
print "MG", mc.processDataReceived("MG _MOA, _BGA, _TDA, _TPA, _SPA, _ACA, _DCA")
print "Iterations:", count

View File

@@ -0,0 +1,88 @@
#!/usr//bin/env python
# vim: ts=8 sts=4 sw=4 expandtab
# Author: Douglas Clowes (dcl@ansto.gov.au) 2012-06-29
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import ServerFactory
from galilcontroller import GalilController
class GalilProtocol(Protocol):
"""Protocol object used by the Twisted Infrastructure to handle connections
"""
def __init__(self, thisDevice):
print "GalilProtocol protocol ctor", thisDevice
self.device = thisDevice
self.term = "\r\n"
def write(self, response):
self.response = self.response + response
def connectionMade(self):
self.pdu = ""
self.response = ""
self.device.protocol = self
self.factory.numProtocols = self.factory.numProtocols + 1
print "connectionMade: connection number", self.factory.numProtocols
if self.factory.numProtocols > 2:
print "Too many connections - rejecting"
self.transport.write("Too many connections, try later\r\n")
self.transport.loseConnection()
'''else:
self.transport.write("Welcome connection %d\r\n" % self.factory.numProtocols)'''
def connectionLost(self, reason):
self.factory.numProtocols = self.factory.numProtocols - 1
print "connectionLost: connections left", self.factory.numProtocols
def dataReceived(self, data):
#print "dataReceived - len:", len(data)
last_was_cr = False
for c in data:
if c == "\r" or c == ";" or c == "\n":
if c == "\n" and last_was_cr:
continue
last_was_cr = False
if c == "\r":
last_was_cr = True
if len(self.pdu) > 0:
#print "Request: ", self.pdu
the_response = self.device.dataReceived(self.pdu)
#print "Response:", the_response
for item in the_response:
self.write(item + self.term)
self.pdu = ""
if c == ";":
if len(self.response) > 0 and self.response[-1] != ":":
self.response = self.response + ":"
else:
if len(self.response) > 0:
if self.response[-1] == ";":
self.response = self.response[:-1]
if len(self.response) > 0 and self.response[-1] != ":":
self.response = self.response + ":"
if len(self.response) == 0:
self.response = self.response + ":"
if len(self.response) > 0:
#print "Protocol Response: %s" % self.response
self.transport.write(self.response)
self.response = ""
else:
self.pdu = self.pdu + c
class GalilFactory(ServerFactory):
"""Factory object used by the Twisted Infrastructure to create a Protocol
object for incomming connections"""
protocol = GalilProtocol
def __init__(self, thePort):
print "GalilFactory ctor: "
self.port = thePort
self.device = GalilController(thePort)
self.numProtocols = 0
def buildProtocol(self, addr):
p = self.protocol(self.device)
p.factory = self
return p

View File

@@ -0,0 +1,350 @@
#!/usr//bin/env python
# vim: ts=8 sts=4 sw=4 expandtab
# Author: Douglas Clowes (dcl@ansto.gov.au) 2012-06-29
import inspect
import time
import math
class GalilMotor(object):
"""Galil Motor Object"""
def __init__(self, theAxis):
print "GalilMotor ctor: %s" % theAxis
self.axis = theAxis
self.motorState = "OFF"
self.stepsPerX = 25000.0
self.cntsPerX = 8192.0
self.speed = 25000
self.accel = 2500
self.decel = 2500
self.lowerLim = -20.0
self.upperLim = 20.0
self.hasAbsEnc = True
self.unitsHome = 0.0
self.absEncHome = 100 * 1000 * 1000
self.motorHome = 0.0
self.currentSteps = 0
self.targetSteps = self.currentSteps
self.configDone = False
self.switches = 44
self.stopcode = 0
self.stopCallback = None
self.moveCallback = None
self.trace = False
def doTrace(self, arg):
print "Trace %s: %s" % (self.axis, arg),
if self.trace:
print "(ON =>",
else:
print "(OFF =>",
if arg == "ON":
self.trace = True
else:
self.trace = False
if self.trace:
print "ON)"
else:
print "OFF)"
def doConfigure(self, arg):
print "Configure %s: %s" % (self.axis, arg)
the_parts = arg.split(",")
configDoneAlready = self.configDone
for part in the_parts:
(lh, rh) = part.split("=")
if lh == "SPX":
self.stepsPerX = float(rh)
self.configDone = True
elif lh == "CPX":
self.cntsPerX = float(rh)
self.configDone = True
elif lh == "RL":
self.lowerLim = float(rh)
self.configDone = True
elif lh == "FL":
self.upperLim = float(rh)
self.configDone = True
elif lh == "UH":
self.unitsHome = float(rh)
self.configDone = True
elif lh == "EH":
self.hasAbsEnc = True
self.absEncHome = float(rh)
self.configDone = True
elif lh == "MH":
self.hasAbsEnc = False
self.motorHome = float(rh)
self.configDone = True
else:
print "Unrecognized config item \"%s\" in \"%s\"" % (lh, arg)
if self.configDone and not configDoneAlready:
self.currentSteps = 0
self.targetSteps = 0
def getEncoder(self):
'''Calculate the encoder from the currentSteps'''
value = self.absEncHome + self.currentSteps * (self.cntsPerX / self.stepsPerX)
return int(value)
def getPosition(self):
'''Calculate the unit position'''
value = self.unitsHome + self.currentSteps / self.stepsPerX
value = self.getEncoder() - self.absEncHome
value /= self.cntsPerX
value += self.unitsHome
return value
def doIteration(self, now):
'''If moving then update speed and position using
v = u + at
s = ut + 1/2 at**2 = (u + v)t / 2
v**2 = u**2 + 2as
'''
if self.motorState in ["MOVING"]:
t = now - self.motorStartTime
distance = abs(self.targetSteps - self.motorStartSteps)
topSpeed = math.sqrt(2.0 * distance / ( 1.0 / self.accel + 1.0 / self.decel))
if topSpeed < 1e-6:
topSpeed = 0
if topSpeed > self.speed:
topSpeed = self.speed
t1 = topSpeed / self.accel
s1 = 0.5 * self.accel * t1**2
t3 = topSpeed / self.decel
s3 = 0.5 * self.decel * t3**2
s2 = distance - s1 - s3
if s2 < 1e-6:
s2 = 0
if s2 == 0 or topSpeed == 0:
t2 = 0
else:
t2 = s2 / topSpeed
if t >= (t1 + t2 + t3):
# Finished
s = distance
v = 0
self.motorState = "STOPPED"
self.stopcode = 1
self.switches = 44 # stopped, both limits clear
elif t >= (t1 + t2):
# Ramp Down
dt = t - (t1 + t2)
# v = u + at
v = topSpeed - self.decel * dt
# s = (u + v)t / 2
s = s1 + s2 + 0.5 * (topSpeed + v) * dt
elif t >= t1:
# Run Flat
v = topSpeed
s = s1 + topSpeed * (t - t1)
else:
# Ramp Up
v = 0.0 + self.accel * t
s = 0.5 * self.accel * t**2
if self.speed < 1e-6:
self.percentVelocity = 100.0
else:
self.percentVelocity = 100.0 * v / self.speed
if distance < 1e-6:
self.percentDistance = 100.0
else:
self.percentDistance = 100.0 * s / distance
if self.targetSteps > self.motorStartSteps:
self.currentSteps = self.motorStartSteps + int(s)
if self.currentSteps >= self.targetSteps:
self.motorState = "STOPPED"
self.stopcode = 1
self.switches = 44 # stopped, both limits clear
elif self.targetSteps < self.motorStartSteps:
self.currentSteps = self.motorStartSteps - int(s)
if self.currentSteps <= self.targetSteps:
self.motorState = "STOPPED"
self.stopcode = 1
self.switches = 44 # stopped, both limits clear
else:
self.motorState = "STOPPED"
self.stopcode = 1
self.switches = 44 # stopped, both limits clear
currentPosition = self.getPosition()
if not self.moveCallback == None:
self.moveCallback(self)
# TODO Forward Limit
# TODO Reverse Limit
if self.motorState == "STOPPED":
print "Axis %s stopped at:" % self.axis, currentPosition
self.currentSteps = self.targetSteps
if not self.stopCallback == None:
self.stopCallback(self)
else:
pass
def doAssignment(self, cmd, arg):
'''This is for commands like "PAA=1234567" which are for the motor.
The axis and the assignment operator have already been removed.
'''
if self.trace:
print "Trace ASG %s: %s" % (self.axis, str(cmd) + " " + str(arg))
if cmd == "PA":
self.targetSteps = int(arg)
elif cmd == "PR":
self.targetSteps = self.currentSteps + int(arg)
elif cmd == "SP":
self.speed = max(1, abs(int(arg)))
elif cmd == "AC":
self.accel = max(1, abs(int(arg)))
elif cmd == "DC":
self.decel = max(1, abs(int(arg)))
else:
print "Unknown assignment", cmd, arg
def doQuery(self, arg):
if arg == "TD":
response = self.currentSteps
elif arg == "ZZ":
response = self.getPosition()
elif arg == "TP":
response = self.getEncoder()
elif arg == "SP":
response = self.speed
elif arg == "AC":
response = self.accel
elif arg == "DC":
response = self.decel
elif arg == "TS":
response = self.switches
elif arg == "SC":
response = self.stopcode
elif arg == "MO":
if self.motorState in ["OFF"]:
response = 1
else:
response = 0
elif arg == "BG":
if self.motorState in ["MOVING"]:
response = 1
else:
response = 0
else:
print "Unknown Query:", arg
response = None
if self.trace:
print "Trace QRY %s: %s" % (self.axis, str(arg) + " = " + str(response))
return response
def doCommandTD(self, arg):
return self.doQuery("TD")
def doCommandZZ(self, arg):
result = self.doQuery("ZZ")
print "Position:", result
return result
def doCommandTP(self, arg):
return self.doQuery("TP")
def doCommandST(self, arg):
if self.motorState in ["MOVING"]:
self.motorState = "STOPPED"
self.stopcode = 4
self.switches = 44
return True
def doCommandBG(self, arg):
if self.motorState in ["ON", "STOPPED"]:
self.motorState = "MOVING"
self.stopcode = 0
self.switches = 140 # moving, both limits clear
self.motorStartTime = time.time()
self.motorStartSteps = self.currentSteps
return True
return False
def doCommandSH(self, arg):
if self.motorState in ["ON", "OFF", "STOPPED"]:
self.motorState = "ON"
return True
return False
def doCommandMO(self, arg):
if self.motorState in ["ON", "OFF", "STOPPED"]:
self.motorState = "OFF"
return True
return False
def doCommandPA(self, arg):
self.targetSteps = int(arg)
return True
def doCommandPR(self, arg):
self.targetSteps = self.currentSteps + int(arg)
return True
def doCommand(self, cmd, arg):
'''This is for commands like "BGA" which are for the motor.
The axis and other cruft have already been removed.
'''
action = "inspect.ismethod(GalilMotor.doCommand%s)" % cmd
if eval(action):
if self.trace:
data = cmd
if arg:
data = data + " " + arg
print "Trace CMD: \"" + data + "\""
action = "response = self.doCommand%s(%s)" % (cmd, arg)
exec action
return response
else:
print "Unknown Action:", eval(action), "for cmd:", action
if __name__ == '__main__':
motor = GalilMotor("A")
motor.doConfigure("SPX=25000.0,CPX=8192.0,RL=-10.0,FL=10.0,UH=0.0,EH=100000000")
print "SP", motor.doAssignment("SP", "25000")
print "AC", motor.doAssignment("AC", "25000")
print "DC", motor.doAssignment("DC", "25000")
print "PA", motor.doAssignment("PA", "100000")
print "SH", motor.doCommand("SH", None)
print "BG", motor.doCommand("BG", None)
print "MO", motor.doQuery("MO")
print "BG", motor.doQuery("BG")
count = 0
while motor.doQuery("BG") and count <= 100:
t = 0.1 * count
motor.doIteration(motor.motorStartTime + t)
rnge = 70
vspot = int(0.01 * rnge * motor.percentVelocity)
sspot = int(0.01 * rnge * motor.percentDistance)
bth = "b"
if sspot >= 0:
ess = "s"
else:
ess = "*"
bth = "B"
sspot = 0
if vspot >= 0:
vee = "v"
else:
vee = "^"
bth = "B"
vspot = 0
if vspot < sspot:
line = " "*(vspot) + vee + " "*(sspot - vspot - 1) + ess + " "*(rnge - sspot) + "|"
line += " %2d %2d" % (vspot, sspot)
elif vspot > sspot:
line = " "*(sspot) + ess + " "*(vspot - sspot - 1) + vee + " "*(rnge - vspot) + "|"
line += " %2d %2d" % (sspot, vspot)
else:
line = " "*(sspot) + bth + " "*(rnge - sspot) + "|"
line += " %2d" % (sspot)
print "%3.2f: %s" % (t, line)
count += 1
print "_MO", motor.doQuery("MO")
print "_BG", motor.doQuery("BG")
print "TP", motor.doCommand("TP", None)
print "TD", motor.doCommand("TD", None)
print "ZZ", motor.doCommand("ZZ", None)
print "_ZZ", motor.doQuery("ZZ")
print "MO", motor.doCommand("MO", None)
print "_MO", motor.doQuery("MO")
print "Iterations:", count

View File

@@ -0,0 +1,108 @@
#!/usr//bin/env python
# vim: ts=8 sts=4 sw=4 expandtab
# Author: Douglas Clowes (dcl@ansto.gov.au) 2012-07-18
import math
class CubicPowderSample:
def __init__(self):
self.aspacing = 3.5238 # Angstroms
self.wavelength = 2.34548 # Angstroms
self.wavelength_error = 0.00 # Angstroms
self.peakpos_error = 0.0 # degrees
self.mEv = 0.0
def set_wavelength_from_theta_pg002(self, theta):
dd = 3.35416e-10 # For Monochromator PG 0 0 2
h_for_joules = 6.62606957e-34
neutron_mass = 1.674927351e-27
ev_to_joules = 1.602176565e-19
angle = theta
myWavelength = 2.0 * dd * math.sin(angle * 3.141592654 / 180.0)
if myWavelength > 0:
temp = h_for_joules / (neutron_mass * myWavelength)
joules = 0.5 * neutron_mass * (temp * temp)
self.mEv = 1000 * joules / ev_to_joules
self.wavelength = 1e10 * myWavelength
else:
self.mEv = 0.0
self.wavelength = 0.0
print "theta = %g, wl = %g, mEv = %g" % (angle, self.wavelength, self.mEv)
def atod(self, the_aspacing, h, k, l):
result = math.sqrt(the_aspacing**2 /(h**2 + k**2 + l**2))
return result
def peak_position(self, dspacing, wavelength):
try:
theta = math.asin(wavelength / (2.0 * dspacing)) * 180 / 3.141592654
return 2.0 * theta
except:
print "error in peak_position(", dspacing, wavelength, ")"
return 0.0
def funx(self, x, mu, sigma):
try:
factor = 1.0 / (sigma * math.sqrt(2.0 * math.pi))
y = factor * math.exp(-0.5 * ((x - mu) / sigma)**2)
return y
except:
print "error in func(", x, mu, sigma, ")"
return 0.0
def calc(self, x):
peaks = []
peaks.append([0.5, 0.5, 0.5, 1.0])
peaks.append([1, 0, 0, 0.5])
peaks.append([1, 1, 0, 0.8])
peaks.append([1, 1, 1, 0.9])
peaks.append([2, 0, 0, 0.4])
peaks.append([1.5, 0.5, 0.5, 0.1])
peaks.append([2, 1, 0, 0.5])
peaks.append([2, 1, 1, 0.4])
peaks.append([2, 2, 0, 0.3])
sigma = 0.1 + 0.4 * math.sin(math.pi * x / 180.0)
y = 0.0
for peak in peaks:
peakpos = self.peak_position(self.atod(self.aspacing, peak[0], peak[1], peak[2]), self.wavelength + self.wavelength_error)
y += self.funx(x, peakpos + self.peakpos_error, sigma) * peak[3]
return y
if __name__ == '__main__':
nickel = CubicPowderSample()
px = []
for i in range(200, 891):
x = 0.1 * i
px.append(x)
from sys import argv
from mpl_toolkits.axes_grid.axislines import SubplotZero
import matplotlib.pyplot as plt
fig = plt.figure(1)
ax = SubplotZero(fig, 111)
fig.add_subplot(ax)
for direction in ["xzero", "yzero"]:
#ax.axis[direction].set_axisline_style("-|>")
ax.axis[direction].set_visible(True)
for direction in ["left", "right", "bottom", "top"]:
ax.axis[direction].set_visible(False)
if len(argv) > 1:
for i in range(1, len(argv)):
print "argv[%d]:" % i, argv[i]
nickel.set_wavelength_from_theta_pg002(float(argv[i]))
py = []
for x in px:
y = nickel.calc(x)
py.append(y)
ax.plot(px, py)
else:
py = []
for x in px:
y = nickel.calc(x)
py.append(y)
ax.plot(px, py)
print "len:", len(px), len(py)
plt.show()