Add a display screen for motor position to taipan fake motors

This commit is contained in:
Douglas Clowes
2013-06-26 17:24:47 +10:00
parent 7d35ca4cf9
commit 99f18f8798

View File

@@ -13,12 +13,8 @@ from galilfactory import GalilFactory
from counterclient import CounterClientFactory from counterclient import CounterClientFactory
from powdersample import CubicPowderSample from powdersample import CubicPowderSample
log.startLogging(sys.stdout) import curses
from displayscreen import Screen
beam_monitor_counter = None
detector_counter = None
beam_monitor_rate = None
nickel = CubicPowderSample()
def stopCallbackM1M2(motor): def stopCallbackM1M2(motor):
global nickel global nickel
@@ -58,34 +54,101 @@ def device_iterator():
m2.doIteration(now) m2.doIteration(now)
s2.doIteration(now) s2.doIteration(now)
basePort = 62034 # Echidna def display_iterator():
basePort = 62134 # Wombat global screen, devices
basePort = 62430 # Quokka
basePort = 62730 # Taipan
devices = {}
for dev in range(0, 6):
port = basePort + dev
controllerName = "mc%d" % (dev + 1)
factory = GalilFactory(port)
devices[controllerName] = factory.device
reactor.listenTCP(port, factory)
s2 = devices["mc2"].motors["F"] try:
s2.moveCallback = s2.stopCallback = stopCallbackS2 rows, cols = screen.stdscr.getmaxyx()
m1 = devices["mc3"].motors["E"] col = 0
m1.moveCallback = m1.stopCallback = stopCallbackM1M2 for idx in sorted(devices.keys()):
m2 = devices["mc1"].motors["F"] line = 0;
m2.moveCallback = m2.stopCallback = stopCallbackM1M2 col += 1
m1.stopCallback(m1) screen.stdscr.addstr(line, 13 * (col - 1), idx)
m2.stopCallback(m2) for mtr in sorted(devices[idx].motors.keys()):
s2.stopCallback(s2) line += 1
screen.stdscr.addstr(line, 13 * (col - 1), "%1s:%8.3f" % (mtr, devices[idx].motors[mtr].getPosition()))
if (devices[idx].motors[mtr].motorState == "OFF"):
screen.stdscr.addstr(line, 13 * (col - 1) + 10, " ")
else:
screen.stdscr.addstr(line, 13 * (col - 1) + 10, "*")
except:
raise
finally:
try:
screen.stdscr.refresh()
except:
pass
beam_monitor_counter = CounterClientFactory() class MyScreen(Screen):
detector_counter = CounterClientFactory() def __init__(self, stdscr):
connector1 = reactor.connectTCP("localhost", 33001, beam_monitor_counter) Screen.__init__(self, stdscr)
connector2 = reactor.connectTCP("localhost", 33000, detector_counter)
print "Connector1:", connector1.getDestination() def sendLine(self, txt):
print "Connector2:", connector2.getDestination() global devices
lc = LoopingCall(device_iterator) dev, cmd = line.split(":")
lc.start(0.05) print "Dev:", dev, "Cmd:", cmd
reactor.run() if dev in devices:
myDev = devices[dev]
myDev.protocol = self
myDev.dataReceived(cmd)
def write(self, txt):
try:
newLine = self.lines[-1] + " => " + txt
del self.lines[-1]
self.addLine(newLine)
except:
pass
if __name__ == "__main__":
global screen, devices
basePort = 62034 # Echidna
basePort = 62134 # Wombat
basePort = 62430 # Quokka
basePort = 62730 # Taipan
#log.startLogging(sys.stdout)
log.startLogging(open(("/tmp/Fake_Galil_%d.log" % basePort), "w"))
beam_monitor_counter = None
detector_counter = None
beam_monitor_rate = None
nickel = CubicPowderSample()
devices = {}
for dev in range(0, 6):
port = basePort + dev
controllerName = "mc%d" % (dev + 1)
factory = GalilFactory(port)
devices[controllerName] = factory.device
reactor.listenTCP(port, factory)
s2 = devices["mc2"].motors["F"]
s2.moveCallback = s2.stopCallback = stopCallbackS2
m1 = devices["mc3"].motors["E"]
m1.moveCallback = m1.stopCallback = stopCallbackM1M2
m2 = devices["mc1"].motors["F"]
m2.moveCallback = m2.stopCallback = stopCallbackM1M2
m1.stopCallback(m1)
m2.stopCallback(m2)
s2.stopCallback(s2)
beam_monitor_counter = CounterClientFactory()
detector_counter = CounterClientFactory()
connector1 = reactor.connectTCP("localhost", 33001, beam_monitor_counter)
connector2 = reactor.connectTCP("localhost", 33000, detector_counter)
print "Connector1:", connector1.getDestination()
print "Connector2:", connector2.getDestination()
stdscr = curses.initscr()
screen = MyScreen(stdscr)
reactor.addReader(screen)
print "stdscr:", stdscr
print "screen:", screen
lc = LoopingCall(device_iterator)
lc.start(0.05)
dc = LoopingCall(display_iterator)
dc.start(0.5)
reactor.run()