121 lines
4.3 KiB
Python
Executable File
121 lines
4.3 KiB
Python
Executable File
#!/usr//bin/env python
|
|
# vim: ts=8 sts=4 sw=4 expandtab
|
|
# Author: Douglas Clowes (dcl@ansto.gov.au) 2012-06-29
|
|
from twisted.internet import reactor
|
|
from twisted.python import log
|
|
from twisted.internet.task import LoopingCall
|
|
import argparse
|
|
import random
|
|
import re
|
|
import os
|
|
import sys
|
|
import time
|
|
import inspect
|
|
from galilfactory import GalilFactory
|
|
|
|
import curses
|
|
sys.path.insert(0, os.path.realpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),"../../util"))))
|
|
from displayscreen import Screen
|
|
|
|
def device_iterator():
|
|
now = time.time()
|
|
|
|
def display_iterator():
|
|
global screen, devices, factories
|
|
|
|
try:
|
|
rows, cols = screen.stdscr.getmaxyx()
|
|
col = 0
|
|
column_width = 78 / len(devices.keys())
|
|
for idx in sorted(devices.keys()):
|
|
line = 0;
|
|
col += 1
|
|
header = "%s=%d" % (idx, factories[idx].numProtocols)
|
|
screen.stdscr.addstr(line, column_width * (col - 1) + 2, header.rjust(column_width-1))
|
|
line = 1
|
|
for mtr in sorted(devices[idx].motors.keys()):
|
|
screen.stdscr.addstr(line, 0, "%1s:" % mtr)
|
|
try:
|
|
name = devices[idx].motors[mtr].name
|
|
if len(name) > column_width-1:
|
|
name = name[:column_width-1]
|
|
if (devices[idx].motors[mtr].motorState == "OFF"):
|
|
screen.stdscr.addstr(line, column_width * (col - 1) + 2, "%s" % name.rjust(column_width-1), curses.A_DIM)
|
|
else:
|
|
screen.stdscr.addstr(line, column_width * (col - 1) + 2, "%s" % name.rjust(column_width-1), curses.A_STANDOUT)
|
|
except:
|
|
screen.stdscr.addstr(line, column_width * (col - 1) + 2, "%s" % "".rjust(column_width-1))
|
|
value = "%.3f" % devices[idx].motors[mtr].getPosition()
|
|
if len(value) > column_width-1:
|
|
value = value[:column_width-1]
|
|
if (devices[idx].motors[mtr].motorState == "OFF"):
|
|
screen.stdscr.addstr(line + 1, column_width * (col - 1) + 2, "%s" % value.rjust(column_width-1), curses.A_BOLD)
|
|
else:
|
|
screen.stdscr.addstr(line + 1, column_width * (col - 1) + 2, "%s" % value.rjust(column_width-1), curses.A_STANDOUT)
|
|
line += 2
|
|
except:
|
|
raise
|
|
finally:
|
|
try:
|
|
screen.stdscr.refresh()
|
|
except:
|
|
pass
|
|
|
|
class MyScreen(Screen):
|
|
def __init__(self, stdscr):
|
|
Screen.__init__(self, stdscr)
|
|
|
|
def sendLine(self, txt):
|
|
global devices
|
|
dev, cmd = line.split(":")
|
|
print "Dev:", dev, "Cmd:", cmd
|
|
if dev in devices:
|
|
myDev = devices[dev]
|
|
myDev.protocol = self
|
|
myDev.dataReceived(cmd)
|
|
|
|
def write(self, txt):
|
|
try:
|
|
newLine = self.lines[-1] + " => " + txt
|
|
del self.lines[-1]
|
|
self.addLine(newLine)
|
|
except:
|
|
pass
|
|
|
|
def main(**kwargs):
|
|
global screen, devices, factories
|
|
parser = argparse.ArgumentParser( description='Generates fake Galil controllers for testing SICS' )
|
|
parser.add_argument('instrument', help='The instrument name')
|
|
parser.add_argument('-w', '--window', help='Create a display window', action='store_true', default=False)
|
|
args = parser.parse_args()
|
|
|
|
basePort = {
|
|
'echidna': 62034, 'wombat': 62134, 'kowari': 62230, 'dingo': 62300, 'quokka': 62430, 'platypus': 62530,
|
|
'pelican': 62630, 'taipan': 62730, 'lyrebird': 62830, 'kookaburra': 62930, 'bilby': 63030, 'emu': 63130
|
|
}
|
|
#log.startLogging(sys.stdout)
|
|
log.startLogging(open(("/tmp/Fake_Galil_%d.log" % basePort[args.instrument]), "w"))
|
|
devices = {}
|
|
factories = {}
|
|
for dev in range(0, 8):
|
|
port = basePort[args.instrument] + dev
|
|
controllerName = "mc%d" % (dev + 1)
|
|
factory = GalilFactory(port)
|
|
devices[controllerName] = factory.device
|
|
factories[controllerName] = factory
|
|
reactor.listenTCP(port, factory)
|
|
|
|
if (args.window):
|
|
stdscr = curses.initscr()
|
|
screen = MyScreen(stdscr)
|
|
reactor.addReader(screen)
|
|
|
|
#lc = LoopingCall(device_iterator)
|
|
#lc.start(0.5)
|
|
dc = LoopingCall(display_iterator)
|
|
dc.start(0.5)
|
|
reactor.run()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|