Files
sics/site_ansto/instrument/TEST_SICS/fakeGalil/SIM_GALIL.py
2013-12-20 09:09:01 +11:00

114 lines
3.7 KiB
Python
Executable File

#!/usr//bin/env python
# vim: ts=8 sts=4 sw=4 expandtab
# Author: Douglas Clowes (dcl@ansto.gov.au) 2012-06-29
from twisted.internet import reactor
from twisted.python import log
from twisted.internet.task import LoopingCall
import argparse
import random
import re
import os
import sys
import time
import inspect
from galilfactory import GalilFactory
import curses
sys.path.insert(0, os.path.realpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),"../../util"))))
from displayscreen import Screen
def device_iterator():
now = time.time()
def display_iterator():
global screen, devices, factories
try:
rows, cols = screen.stdscr.getmaxyx()
col = 0
for idx in sorted(devices.keys()):
line = 0;
col += 1
screen.stdscr.addstr(line, 13 * (col - 1) + 2, "%8s=%d" % (idx, factories[idx].numProtocols))
line = 1
for mtr in sorted(devices[idx].motors.keys()):
screen.stdscr.addstr(line, 0, "%1s:" % mtr)
try:
name = devices[idx].motors[mtr].name
if len(name) > 12:
name = name[:12]
screen.stdscr.addstr(line, 13 * (col - 1) + 2, "%12s" % name.rjust(12), curses.A_DIM)
except:
screen.stdscr.addstr(line, 13 * (col - 1) + 2, "%12s" % "")
value = "%12.3f" % devices[idx].motors[mtr].getPosition()
if (devices[idx].motors[mtr].motorState == "OFF"):
screen.stdscr.addstr(line + 1, 13 * (col - 1) + 2, "%s " % value, curses.A_BOLD)
else:
screen.stdscr.addstr(line + 1, 13 * (col - 1) + 2, "%s*" % value, curses.A_STANDOUT)
line += 2
except:
raise
finally:
try:
screen.stdscr.refresh()
except:
pass
class MyScreen(Screen):
def __init__(self, stdscr):
Screen.__init__(self, stdscr)
def sendLine(self, txt):
global devices
dev, cmd = line.split(":")
print "Dev:", dev, "Cmd:", cmd
if dev in devices:
myDev = devices[dev]
myDev.protocol = self
myDev.dataReceived(cmd)
def write(self, txt):
try:
newLine = self.lines[-1] + " => " + txt
del self.lines[-1]
self.addLine(newLine)
except:
pass
def main(**kwargs):
global screen, devices, factories
parser = argparse.ArgumentParser( description='Generates fake Galil controllers for testing SICS' )
parser.add_argument('instrument', help='The instrument name')
parser.add_argument('-w', '--window', help='Create a display window', action='store_true', default=False)
args = parser.parse_args()
basePort = {
'echidna': 62034, 'wombat': 62134, 'kowari': 62230, 'dingo': 62300, 'quokka': 62430, 'platypus': 62530,
'pelican': 62630, 'taipan': 62730, 'lyrebird': 62830, 'kookaburra': 62930, 'bilby': 63030, 'emu': 63130
}
#log.startLogging(sys.stdout)
log.startLogging(open(("/tmp/Fake_Galil_%d.log" % basePort[args.instrument]), "w"))
devices = {}
factories = {}
for dev in range(0, 6):
port = basePort[args.instrument] + dev
controllerName = "mc%d" % (dev + 1)
factory = GalilFactory(port)
devices[controllerName] = factory.device
factories[controllerName] = factory
reactor.listenTCP(port, factory)
if (args.window):
stdscr = curses.initscr()
screen = MyScreen(stdscr)
reactor.addReader(screen)
#lc = LoopingCall(device_iterator)
#lc.start(0.5)
dc = LoopingCall(display_iterator)
dc.start(0.5)
reactor.run()
if __name__ == '__main__':
main()