diff --git a/site_ansto/instrument/TEST_SICS/fakeGalil/SIM_GALIL_taipan.py b/site_ansto/instrument/TEST_SICS/fakeGalil/SIM_GALIL_taipan.py new file mode 100755 index 00000000..aa5d40d5 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeGalil/SIM_GALIL_taipan.py @@ -0,0 +1,91 @@ +#!/usr//bin/env python +# vim: ts=8 sts=4 sw=4 expandtab +# Author: Douglas Clowes (dcl@ansto.gov.au) 2012-06-29 +from twisted.internet import reactor +from twisted.python import log +from twisted.internet.task import LoopingCall +import random +import re +import sys +import time +import inspect +from galilfactory import GalilFactory +from counterclient import CounterClientFactory +from powdersample import CubicPowderSample + +log.startLogging(sys.stdout) + +beam_monitor_counter = None +detector_counter = None +beam_monitor_rate = None +nickel = CubicPowderSample() + +def stopCallbackM1M2(motor): + global nickel + global beam_monitor_rate + mu = m2.getPosition() / 2.0 + nickel.set_wavelength_from_theta_pg002(mu) + x = m1.getPosition() + y = nickel.funx(abs(x), abs(mu), 0.2) + baseline = 1000 + height = int(5000 * y) + baseline + beam_monitor_rate = height + print "My Height at", x, "is", height + if beam_monitor_counter: + beam_monitor_counter.setCountRate(height) + if detector_counter: + stopCallbackS2(m1) + print "Motor M1, M2 at:", m1.getPosition(), m2.getPosition(), ", wavelength=", nickel.wavelength, ", mEv=", nickel.mEv + + +def stopCallbackS2(motor): + global nickel + global beam_monitor_rate + if beam_monitor_rate == None: + m1.stopCallback(m1) + x = s2.getPosition() + y = nickel.calc(abs(x)) + baseline = 2000 + height = int(beam_monitor_rate * y) + baseline + print "My Height at", x, "is", height + if detector_counter: + detector_counter.setCountRate(height) + print "Motor S2 at:", s2.getPosition() + +def device_iterator(): + now = time.time() + m1.doIteration(now) + m2.doIteration(now) + s2.doIteration(now) + +basePort = 62034 # Echidna +basePort = 62134 # Wombat +basePort = 62430 # Quokka +basePort = 62730 # Taipan +devices = {} +for dev in range(0, 6): + port = basePort + dev + controllerName = "mc%d" % (dev + 1) + factory = GalilFactory(port) + devices[controllerName] = factory.device + reactor.listenTCP(port, factory) + +s2 = devices["mc2"].motors["F"] +s2.moveCallback = s2.stopCallback = stopCallbackS2 +m1 = devices["mc3"].motors["E"] +m1.moveCallback = m1.stopCallback = stopCallbackM1M2 +m2 = devices["mc1"].motors["F"] +m2.moveCallback = m2.stopCallback = stopCallbackM1M2 +m1.stopCallback(m1) +m2.stopCallback(m2) +s2.stopCallback(s2) + +beam_monitor_counter = CounterClientFactory() +detector_counter = CounterClientFactory() +connector1 = reactor.connectTCP("localhost", 33001, beam_monitor_counter) +connector2 = reactor.connectTCP("localhost", 33000, detector_counter) +print "Connector1:", connector1.getDestination() +print "Connector2:", connector2.getDestination() +lc = LoopingCall(device_iterator) +lc.start(0.05) +reactor.run() diff --git a/site_ansto/instrument/TEST_SICS/fakeGalil/counterclient.py b/site_ansto/instrument/TEST_SICS/fakeGalil/counterclient.py new file mode 100644 index 00000000..95519a65 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeGalil/counterclient.py @@ -0,0 +1,52 @@ +#!/usr//bin/env python +# vim: ts=8 sts=4 sw=4 expandtab +# Author: Douglas Clowes (dcl@ansto.gov.au) 2012-06-29 +from twisted.internet.protocol import Protocol +from twisted.internet.protocol import ReconnectingClientFactory + +myCounters = {} + +class Counter(Protocol): + def dataReceived(self, data): + print "Data:", data + + def connectionMade(self): + print "connectionMade", self.transport.getHost(), self.transport.getPeer() + + def connectionLost(self, reason): + print "connectionLost" + + def sendMessage(self, m): + self.transport.write(m) + +class CounterClientFactory(ReconnectingClientFactory): + def __init__(self): + self.device = None + + def startedConnecting(self, connector): + print 'Started to connect.' + + def buildProtocol(self, addr): + global myCounters + print 'Connected to:', addr + self.resetDelay() + p = Counter() + myCounters[addr.port] = p + self.device = p + print "My Counters", myCounters + return p + + def clientConnectionLost(self, connector, reason): + global myCounters + print 'Lost connection', connector.getDestination().port,'Reason:', reason + del myCounters[connector.getDestination().port] + self.device = None + ReconnectingClientFactory.clientConnectionLost(self, connector, reason) + + def clientConnectionFailed(self, connector, reason): + print 'Connection failed. Reason:', reason + ReconnectingClientFactory.clientConnectionFailed(self, connector,reason) + + def setCountRate(self, rate): + if self.device: + self.device.sendMessage("SET SOURCE=%d\r\n" % rate) diff --git a/site_ansto/instrument/TEST_SICS/fakeGalil/galilcontroller.py b/site_ansto/instrument/TEST_SICS/fakeGalil/galilcontroller.py new file mode 100644 index 00000000..aa4bc2aa --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeGalil/galilcontroller.py @@ -0,0 +1,215 @@ +#!/usr//bin/env python +# vim: ts=8 sts=4 sw=4 expandtab +# Author: Douglas Clowes (dcl@ansto.gov.au) 2012-06-29 +import string +import time +import re +from galilmotor import GalilMotor + +class GalilController(object): + """Galil Controller motor controller object""" + + def __init__(self, thePort): + print "GalilController ctor: %d" % int(thePort) + self.port = thePort + self.motors = {} + for letter in "ABCDEFGH": + self.motors[letter] = GalilMotor(letter) + self.reset_powerup() + + def reset_powerup(self): + print self.__class__.__name__, "reset_powerup" + self.variables = {} + + def write(self, response): + #print "Device Response: %s" % response + self.response.append(response) + + def doIteration(self, now): + #print "DoIteration" + # pass to motors + for letter in "ABCDEFGH": + self.motors[letter].doIteration(now) + + def doAssignment(self, cmd, arg): + action = cmd[0:2] + axis = cmd[2:] + #print "Assignment %s(%s) = %s" % (action, axis, arg) + if action in ["PA", "SP", "AC", "DC"]: + if len(axis) == 1 and axis in "ABCDEFGH": + self.motors[axis].doAssignment(action, arg) + return + self.variables[cmd] = arg + + def doCommandMG(self, cmd, arg): + #print cmd, arg + if (arg[0] == "\"" or arg[1] == "'") and arg[0] == arg[-1]: + arg = arg[1:-1] + if re.match("^CONFIG[A-H]=", arg): + the_axis = arg[6] + the_config = arg[8:] + self.motors[the_axis].doConfigure(the_config) + self.write(arg) + return + if re.match("^TRACE[A-H]=", arg): + the_axis = arg[5] + the_config = arg[7:] + self.motors[the_axis].doTrace(the_config) + self.write(arg) + return + if re.match("^DROP", arg): + self.protocol.transport.loseConnection(); + return + # TODO replace this simple stuff with more complex + if re.match("^{[^}]+}", arg): + the_match = re.match("^({[^}]+})(.*)$", arg) + the_format = the_match.group(1) + the_rest = string.strip(the_match.group(2)) + arg = the_rest + response = [] + for item in arg.split(","): + item = string.strip(item) + #print "Item:", item + if item == "_TI0": + response.append(" 0000000000") + elif item == "_TI1": + response.append(" 0000000000") + elif item == "_XQ0": + response.append(" 0000000123") + elif len(item) == 4 and item[0] == "_": + axis = arg[3] + param = self.motors[axis].doQuery(item[1:3]) + if param < 0: + paramstr = "-" + ("%010i" % abs(param)) + else: + paramstr = " " + ("%010i" % param) + response.append(paramstr) + result = string.join(response, "") + #print result + self.write(result) + + def doMotorCommand(self, cmd, arg): + # TODO replace this simple stuff with more complex + action = cmd[0:2] + axes = cmd[2:] + arg + for axis in axes: + self.motors[axis].doCommand(action, None) + + def doCommandLV(self, cmd, arg): + code = "SIMULATE" + self.variables[code] = "1" + keys = self.variables.keys() + keys.sort() + for key in keys: + self.write("%s= %s" % (key, self.variables[key])) + del self.variables[code] + + def doCommandDA(self, cmd, arg): + key = cmd[3:] + if key in self.variables.keys(): + del self.variables[key] + + def doCommand(self, cmd, arg): + action = cmd[0:2] + #print "Command %s(%s) = %s" % (action, cmd[2:], arg) + if action in ["BG", "ST", "TP", "TD", "SH", "MO"]: + self.doMotorCommand(cmd, arg) + if action == "LV": + self.doCommandLV(cmd, arg) + return + elif action == "DA": + self.doCommandDA(cmd, arg) + elif action == "MG": + self.doCommandMG(cmd, arg) + + def processDataReceived(self, data): + #print "PDU: \"" + data + "\"" + self.response = [] + the_match = re.match("^([A-Z0-9_*]+)(.*)$", data) + if the_match and the_match.group(2)[:1] == "=": + cmd = the_match.group(1) + arg = the_match.group(2)[1:] + self.doAssignment(cmd, arg) + elif re.match("([A-Z][A-Z]) *(.*)$", data): + match = re.match("([A-Z][A-Z]) *(.*)$", data) + cmd = match.group(1) + arg = match.group(2) + self.doCommand(cmd, arg) + else: + print "Command not recognized:", data + cmd = "Command" + arg = "Unknown" + # TODO replace this simple stuff with more complex + #print "OUT: \"" + self.response + "\"" + return self.response + + def dataReceived(self, data): + self.doIteration(time.time()) + return self.processDataReceived(data) + +if __name__ == '__main__': + totalDistance = 100000 + mc = GalilController(0) + print "LV", mc.processDataReceived("LV") + print "MG", mc.processDataReceived("MG \"TRACEA=ON\"") + print "MG", mc.processDataReceived("MG \"CONFIGA=SPX=25000.0,CPX=8192.0,RL=-10.0,FL=10.0,UH=0.0,EH=100000000\"") + print "SP", mc.processDataReceived("SPA=25000") + print "AC", mc.processDataReceived("ACA=25000") + print "DC", mc.processDataReceived("DCA=25000") + print "PA", mc.processDataReceived("PAA=%d" % totalDistance) + start_position = int(mc.processDataReceived("MG _TDA")[0]) + print "SH", mc.processDataReceived("SHA") + print "BG", mc.processDataReceived("BGA") + startTime = time.time() + currentDistance = 0 + print "MO", mc.processDataReceived("MG _MOA") + print "BG", mc.processDataReceived("MG _BGA") + count = 0 + #previousTime = 0 + while int(mc.processDataReceived("MG _BGA")[0]) == 1 and count <= 100: + t = 0.1 * count + mc.doIteration(mc.motors["A"].motorStartTime + t) + rnge = 70 + temp = currentDistance + currentDistance = abs(int(mc.processDataReceived("MG _TDA")[0]) - start_position) + #percentDistance = 100.0 * currentDistance / totalDistance + #percentVelocity = 100.0 * (currentDistance - temp) / (t - previousTime) / mc.processDataReceived("MG _SPA")[0] + #previousTime = t + percentDistance = mc.motors["A"].percentDistance + percentVelocity = mc.motors["A"].percentVelocity + vspot = int(0.01 * rnge * percentVelocity) + sspot = int(0.01 * rnge * percentDistance) + bth = "b" + if sspot >= 0: + ess = "s" + else: + ess = "*" + bth = "B" + sspot = 0 + if vspot >= 0: + vee = "v" + else: + vee = "^" + bth = "B" + vspot = 0 + if vspot < sspot: + line = " "*(vspot) + vee + " "*(sspot - vspot - 1) + ess + " "*(rnge - sspot) + "|" + line += " %2d %2d" % (vspot, sspot) + elif vspot > sspot: + line = " "*(sspot) + ess + " "*(vspot - sspot - 1) + vee + " "*(rnge - vspot) + "|" + line += " %2d %2d" % (sspot, vspot) + else: + line = " "*(sspot) + bth + " "*(rnge - sspot) + "|" + line += " %2d" % (sspot) + print "%3.2f: %s" % (t, line) + count += 1 + print "_MO", mc.processDataReceived("MG _MOA") + print "_BG", mc.processDataReceived("MG _BGA") + print "_TP", mc.processDataReceived("MG _TPA") + print "_TD", mc.processDataReceived("MG _TDA") + print "MO", mc.processDataReceived("MOA") + print "_MO", mc.processDataReceived("MG _MOA") + print "ZZ", mc.processDataReceived("ZZA") + print "_ZZ", mc.processDataReceived("MG _ZZA") + print "MG", mc.processDataReceived("MG _MOA, _BGA, _TDA, _TPA, _SPA, _ACA, _DCA") + print "Iterations:", count diff --git a/site_ansto/instrument/TEST_SICS/fakeGalil/galilfactory.py b/site_ansto/instrument/TEST_SICS/fakeGalil/galilfactory.py new file mode 100644 index 00000000..5a2556fb --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeGalil/galilfactory.py @@ -0,0 +1,88 @@ +#!/usr//bin/env python +# vim: ts=8 sts=4 sw=4 expandtab +# Author: Douglas Clowes (dcl@ansto.gov.au) 2012-06-29 +from twisted.internet.protocol import Protocol +from twisted.internet.protocol import ServerFactory +from galilcontroller import GalilController + +class GalilProtocol(Protocol): + """Protocol object used by the Twisted Infrastructure to handle connections + """ + + def __init__(self, thisDevice): + print "GalilProtocol protocol ctor", thisDevice + self.device = thisDevice + self.term = "\r\n" + + def write(self, response): + self.response = self.response + response + + def connectionMade(self): + self.pdu = "" + self.response = "" + self.device.protocol = self + self.factory.numProtocols = self.factory.numProtocols + 1 + print "connectionMade: connection number", self.factory.numProtocols + if self.factory.numProtocols > 2: + print "Too many connections - rejecting" + self.transport.write("Too many connections, try later\r\n") + self.transport.loseConnection() + '''else: + self.transport.write("Welcome connection %d\r\n" % self.factory.numProtocols)''' + + def connectionLost(self, reason): + self.factory.numProtocols = self.factory.numProtocols - 1 + print "connectionLost: connections left", self.factory.numProtocols + + def dataReceived(self, data): + #print "dataReceived - len:", len(data) + last_was_cr = False + for c in data: + if c == "\r" or c == ";" or c == "\n": + if c == "\n" and last_was_cr: + continue + last_was_cr = False + if c == "\r": + last_was_cr = True + if len(self.pdu) > 0: + #print "Request: ", self.pdu + the_response = self.device.dataReceived(self.pdu) + #print "Response:", the_response + for item in the_response: + self.write(item + self.term) + self.pdu = "" + if c == ";": + if len(self.response) > 0 and self.response[-1] != ":": + self.response = self.response + ":" + else: + if len(self.response) > 0: + if self.response[-1] == ";": + self.response = self.response[:-1] + if len(self.response) > 0 and self.response[-1] != ":": + self.response = self.response + ":" + if len(self.response) == 0: + self.response = self.response + ":" + if len(self.response) > 0: + #print "Protocol Response: %s" % self.response + self.transport.write(self.response) + self.response = "" + else: + self.pdu = self.pdu + c + +class GalilFactory(ServerFactory): + """Factory object used by the Twisted Infrastructure to create a Protocol + object for incomming connections""" + + protocol = GalilProtocol + + def __init__(self, thePort): + print "GalilFactory ctor: " + self.port = thePort + self.device = GalilController(thePort) + self.numProtocols = 0 + + def buildProtocol(self, addr): + p = self.protocol(self.device) + p.factory = self + return p + diff --git a/site_ansto/instrument/TEST_SICS/fakeGalil/galilmotor.py b/site_ansto/instrument/TEST_SICS/fakeGalil/galilmotor.py new file mode 100644 index 00000000..ec1dfd4e --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeGalil/galilmotor.py @@ -0,0 +1,350 @@ +#!/usr//bin/env python +# vim: ts=8 sts=4 sw=4 expandtab +# Author: Douglas Clowes (dcl@ansto.gov.au) 2012-06-29 +import inspect +import time +import math + +class GalilMotor(object): + """Galil Motor Object""" + + def __init__(self, theAxis): + print "GalilMotor ctor: %s" % theAxis + self.axis = theAxis + self.motorState = "OFF" + self.stepsPerX = 25000.0 + self.cntsPerX = 8192.0 + self.speed = 25000 + self.accel = 2500 + self.decel = 2500 + self.lowerLim = -20.0 + self.upperLim = 20.0 + self.hasAbsEnc = True + self.unitsHome = 0.0 + self.absEncHome = 100 * 1000 * 1000 + self.motorHome = 0.0 + self.currentSteps = 0 + self.targetSteps = self.currentSteps + self.configDone = False + self.switches = 44 + self.stopcode = 0 + self.stopCallback = None + self.moveCallback = None + self.trace = False + + def doTrace(self, arg): + print "Trace %s: %s" % (self.axis, arg), + if self.trace: + print "(ON =>", + else: + print "(OFF =>", + if arg == "ON": + self.trace = True + else: + self.trace = False + if self.trace: + print "ON)" + else: + print "OFF)" + + def doConfigure(self, arg): + print "Configure %s: %s" % (self.axis, arg) + the_parts = arg.split(",") + configDoneAlready = self.configDone + for part in the_parts: + (lh, rh) = part.split("=") + if lh == "SPX": + self.stepsPerX = float(rh) + self.configDone = True + elif lh == "CPX": + self.cntsPerX = float(rh) + self.configDone = True + elif lh == "RL": + self.lowerLim = float(rh) + self.configDone = True + elif lh == "FL": + self.upperLim = float(rh) + self.configDone = True + elif lh == "UH": + self.unitsHome = float(rh) + self.configDone = True + elif lh == "EH": + self.hasAbsEnc = True + self.absEncHome = float(rh) + self.configDone = True + elif lh == "MH": + self.hasAbsEnc = False + self.motorHome = float(rh) + self.configDone = True + else: + print "Unrecognized config item \"%s\" in \"%s\"" % (lh, arg) + if self.configDone and not configDoneAlready: + self.currentSteps = 0 + self.targetSteps = 0 + + def getEncoder(self): + '''Calculate the encoder from the currentSteps''' + value = self.absEncHome + self.currentSteps * (self.cntsPerX / self.stepsPerX) + return int(value) + + def getPosition(self): + '''Calculate the unit position''' + value = self.unitsHome + self.currentSteps / self.stepsPerX + value = self.getEncoder() - self.absEncHome + value /= self.cntsPerX + value += self.unitsHome + return value + + def doIteration(self, now): + '''If moving then update speed and position using + v = u + at + s = ut + 1/2 at**2 = (u + v)t / 2 + v**2 = u**2 + 2as + ''' + if self.motorState in ["MOVING"]: + t = now - self.motorStartTime + distance = abs(self.targetSteps - self.motorStartSteps) + topSpeed = math.sqrt(2.0 * distance / ( 1.0 / self.accel + 1.0 / self.decel)) + if topSpeed < 1e-6: + topSpeed = 0 + if topSpeed > self.speed: + topSpeed = self.speed + t1 = topSpeed / self.accel + s1 = 0.5 * self.accel * t1**2 + t3 = topSpeed / self.decel + s3 = 0.5 * self.decel * t3**2 + s2 = distance - s1 - s3 + if s2 < 1e-6: + s2 = 0 + if s2 == 0 or topSpeed == 0: + t2 = 0 + else: + t2 = s2 / topSpeed + if t >= (t1 + t2 + t3): + # Finished + s = distance + v = 0 + self.motorState = "STOPPED" + self.stopcode = 1 + self.switches = 44 # stopped, both limits clear + elif t >= (t1 + t2): + # Ramp Down + dt = t - (t1 + t2) + # v = u + at + v = topSpeed - self.decel * dt + # s = (u + v)t / 2 + s = s1 + s2 + 0.5 * (topSpeed + v) * dt + elif t >= t1: + # Run Flat + v = topSpeed + s = s1 + topSpeed * (t - t1) + else: + # Ramp Up + v = 0.0 + self.accel * t + s = 0.5 * self.accel * t**2 + if self.speed < 1e-6: + self.percentVelocity = 100.0 + else: + self.percentVelocity = 100.0 * v / self.speed + if distance < 1e-6: + self.percentDistance = 100.0 + else: + self.percentDistance = 100.0 * s / distance + if self.targetSteps > self.motorStartSteps: + self.currentSteps = self.motorStartSteps + int(s) + if self.currentSteps >= self.targetSteps: + self.motorState = "STOPPED" + self.stopcode = 1 + self.switches = 44 # stopped, both limits clear + elif self.targetSteps < self.motorStartSteps: + self.currentSteps = self.motorStartSteps - int(s) + if self.currentSteps <= self.targetSteps: + self.motorState = "STOPPED" + self.stopcode = 1 + self.switches = 44 # stopped, both limits clear + else: + self.motorState = "STOPPED" + self.stopcode = 1 + self.switches = 44 # stopped, both limits clear + currentPosition = self.getPosition() + if not self.moveCallback == None: + self.moveCallback(self) + # TODO Forward Limit + # TODO Reverse Limit + if self.motorState == "STOPPED": + print "Axis %s stopped at:" % self.axis, currentPosition + self.currentSteps = self.targetSteps + if not self.stopCallback == None: + self.stopCallback(self) + else: + pass + + def doAssignment(self, cmd, arg): + '''This is for commands like "PAA=1234567" which are for the motor. + The axis and the assignment operator have already been removed. + ''' + if self.trace: + print "Trace ASG %s: %s" % (self.axis, str(cmd) + " " + str(arg)) + if cmd == "PA": + self.targetSteps = int(arg) + elif cmd == "PR": + self.targetSteps = self.currentSteps + int(arg) + elif cmd == "SP": + self.speed = max(1, abs(int(arg))) + elif cmd == "AC": + self.accel = max(1, abs(int(arg))) + elif cmd == "DC": + self.decel = max(1, abs(int(arg))) + else: + print "Unknown assignment", cmd, arg + + def doQuery(self, arg): + if arg == "TD": + response = self.currentSteps + elif arg == "ZZ": + response = self.getPosition() + elif arg == "TP": + response = self.getEncoder() + elif arg == "SP": + response = self.speed + elif arg == "AC": + response = self.accel + elif arg == "DC": + response = self.decel + elif arg == "TS": + response = self.switches + elif arg == "SC": + response = self.stopcode + elif arg == "MO": + if self.motorState in ["OFF"]: + response = 1 + else: + response = 0 + elif arg == "BG": + if self.motorState in ["MOVING"]: + response = 1 + else: + response = 0 + else: + print "Unknown Query:", arg + response = None + if self.trace: + print "Trace QRY %s: %s" % (self.axis, str(arg) + " = " + str(response)) + return response + + def doCommandTD(self, arg): + return self.doQuery("TD") + + def doCommandZZ(self, arg): + result = self.doQuery("ZZ") + print "Position:", result + return result + + def doCommandTP(self, arg): + return self.doQuery("TP") + + def doCommandST(self, arg): + if self.motorState in ["MOVING"]: + self.motorState = "STOPPED" + self.stopcode = 4 + self.switches = 44 + return True + + def doCommandBG(self, arg): + if self.motorState in ["ON", "STOPPED"]: + self.motorState = "MOVING" + self.stopcode = 0 + self.switches = 140 # moving, both limits clear + self.motorStartTime = time.time() + self.motorStartSteps = self.currentSteps + return True + return False + + def doCommandSH(self, arg): + if self.motorState in ["ON", "OFF", "STOPPED"]: + self.motorState = "ON" + return True + return False + + def doCommandMO(self, arg): + if self.motorState in ["ON", "OFF", "STOPPED"]: + self.motorState = "OFF" + return True + return False + + def doCommandPA(self, arg): + self.targetSteps = int(arg) + return True + + def doCommandPR(self, arg): + self.targetSteps = self.currentSteps + int(arg) + return True + + def doCommand(self, cmd, arg): + '''This is for commands like "BGA" which are for the motor. + The axis and other cruft have already been removed. + ''' + action = "inspect.ismethod(GalilMotor.doCommand%s)" % cmd + if eval(action): + if self.trace: + data = cmd + if arg: + data = data + " " + arg + print "Trace CMD: \"" + data + "\"" + action = "response = self.doCommand%s(%s)" % (cmd, arg) + exec action + return response + else: + print "Unknown Action:", eval(action), "for cmd:", action + +if __name__ == '__main__': + motor = GalilMotor("A") + motor.doConfigure("SPX=25000.0,CPX=8192.0,RL=-10.0,FL=10.0,UH=0.0,EH=100000000") + print "SP", motor.doAssignment("SP", "25000") + print "AC", motor.doAssignment("AC", "25000") + print "DC", motor.doAssignment("DC", "25000") + print "PA", motor.doAssignment("PA", "100000") + print "SH", motor.doCommand("SH", None) + print "BG", motor.doCommand("BG", None) + print "MO", motor.doQuery("MO") + print "BG", motor.doQuery("BG") + count = 0 + while motor.doQuery("BG") and count <= 100: + t = 0.1 * count + motor.doIteration(motor.motorStartTime + t) + rnge = 70 + vspot = int(0.01 * rnge * motor.percentVelocity) + sspot = int(0.01 * rnge * motor.percentDistance) + bth = "b" + if sspot >= 0: + ess = "s" + else: + ess = "*" + bth = "B" + sspot = 0 + if vspot >= 0: + vee = "v" + else: + vee = "^" + bth = "B" + vspot = 0 + if vspot < sspot: + line = " "*(vspot) + vee + " "*(sspot - vspot - 1) + ess + " "*(rnge - sspot) + "|" + line += " %2d %2d" % (vspot, sspot) + elif vspot > sspot: + line = " "*(sspot) + ess + " "*(vspot - sspot - 1) + vee + " "*(rnge - vspot) + "|" + line += " %2d %2d" % (sspot, vspot) + else: + line = " "*(sspot) + bth + " "*(rnge - sspot) + "|" + line += " %2d" % (sspot) + print "%3.2f: %s" % (t, line) + count += 1 + print "_MO", motor.doQuery("MO") + print "_BG", motor.doQuery("BG") + print "TP", motor.doCommand("TP", None) + print "TD", motor.doCommand("TD", None) + print "ZZ", motor.doCommand("ZZ", None) + print "_ZZ", motor.doQuery("ZZ") + print "MO", motor.doCommand("MO", None) + print "_MO", motor.doQuery("MO") + print "Iterations:", count diff --git a/site_ansto/instrument/TEST_SICS/fakeGalil/powdersample.py b/site_ansto/instrument/TEST_SICS/fakeGalil/powdersample.py new file mode 100644 index 00000000..3e158fc5 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeGalil/powdersample.py @@ -0,0 +1,108 @@ +#!/usr//bin/env python +# vim: ts=8 sts=4 sw=4 expandtab +# Author: Douglas Clowes (dcl@ansto.gov.au) 2012-07-18 + +import math + +class CubicPowderSample: + def __init__(self): + self.aspacing = 3.5238 # Angstroms + self.wavelength = 2.34548 # Angstroms + self.wavelength_error = 0.00 # Angstroms + self.peakpos_error = 0.0 # degrees + self.mEv = 0.0 + + def set_wavelength_from_theta_pg002(self, theta): + dd = 3.35416e-10 # For Monochromator PG 0 0 2 + h_for_joules = 6.62606957e-34 + neutron_mass = 1.674927351e-27 + ev_to_joules = 1.602176565e-19 + angle = theta + myWavelength = 2.0 * dd * math.sin(angle * 3.141592654 / 180.0) + if myWavelength > 0: + temp = h_for_joules / (neutron_mass * myWavelength) + joules = 0.5 * neutron_mass * (temp * temp) + self.mEv = 1000 * joules / ev_to_joules + self.wavelength = 1e10 * myWavelength + else: + self.mEv = 0.0 + self.wavelength = 0.0 + print "theta = %g, wl = %g, mEv = %g" % (angle, self.wavelength, self.mEv) + + + def atod(self, the_aspacing, h, k, l): + result = math.sqrt(the_aspacing**2 /(h**2 + k**2 + l**2)) + return result + + def peak_position(self, dspacing, wavelength): + try: + theta = math.asin(wavelength / (2.0 * dspacing)) * 180 / 3.141592654 + return 2.0 * theta + except: + print "error in peak_position(", dspacing, wavelength, ")" + return 0.0 + + def funx(self, x, mu, sigma): + try: + factor = 1.0 / (sigma * math.sqrt(2.0 * math.pi)) + y = factor * math.exp(-0.5 * ((x - mu) / sigma)**2) + return y + except: + print "error in func(", x, mu, sigma, ")" + return 0.0 + + def calc(self, x): + peaks = [] + peaks.append([0.5, 0.5, 0.5, 1.0]) + peaks.append([1, 0, 0, 0.5]) + peaks.append([1, 1, 0, 0.8]) + peaks.append([1, 1, 1, 0.9]) + peaks.append([2, 0, 0, 0.4]) + peaks.append([1.5, 0.5, 0.5, 0.1]) + peaks.append([2, 1, 0, 0.5]) + peaks.append([2, 1, 1, 0.4]) + peaks.append([2, 2, 0, 0.3]) + sigma = 0.1 + 0.4 * math.sin(math.pi * x / 180.0) + y = 0.0 + for peak in peaks: + peakpos = self.peak_position(self.atod(self.aspacing, peak[0], peak[1], peak[2]), self.wavelength + self.wavelength_error) + y += self.funx(x, peakpos + self.peakpos_error, sigma) * peak[3] + return y + +if __name__ == '__main__': + nickel = CubicPowderSample() + px = [] + for i in range(200, 891): + x = 0.1 * i + px.append(x) + from sys import argv + from mpl_toolkits.axes_grid.axislines import SubplotZero + import matplotlib.pyplot as plt + fig = plt.figure(1) + ax = SubplotZero(fig, 111) + fig.add_subplot(ax) + for direction in ["xzero", "yzero"]: + #ax.axis[direction].set_axisline_style("-|>") + ax.axis[direction].set_visible(True) + + for direction in ["left", "right", "bottom", "top"]: + ax.axis[direction].set_visible(False) + + if len(argv) > 1: + for i in range(1, len(argv)): + print "argv[%d]:" % i, argv[i] + nickel.set_wavelength_from_theta_pg002(float(argv[i])) + py = [] + for x in px: + y = nickel.calc(x) + py.append(y) + ax.plot(px, py) + else: + py = [] + for x in px: + y = nickel.calc(x) + py.append(y) + ax.plot(px, py) + print "len:", len(px), len(py) + + plt.show()