Files
sics/site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/SIM_PFEIFFER.py
2014-05-01 10:54:09 +10:00

127 lines
3.4 KiB
Python
Executable File

#!/usr/bin/env python
# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent
# Author: Douglas Clowes (dcl@ansto.gov.au) 2014
from twisted.internet.task import LoopingCall
from twisted.internet import reactor
from twisted.python import log, usage
from Pfeiffer_tpg261 import Pfeiffer_tpg261 as MYBASE
from PfeifferFactory import PfeifferFactory
from PfeifferProtocol import PfeifferProtocol
import os
import sys
sys.path.insert(0, os.path.realpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),"../../../util"))))
from displayscreen import Screen
class MyOptions(usage.Options):
optFlags = [
["window", "w", "Create a display window"],
]
optParameters = [
["logfile", "l", None, "output logfile name"],
["port", "p", None, "port number to listen on"],
]
def __init__(self):
usage.Options.__init__(self)
self['files'] = []
def parseArgs(self, *args):
for arg in args:
self['files'].append(arg)
class MyScreen(Screen):
def __init__(self, stdscr):
Screen.__init__(self, stdscr)
def sendLine(self, txt):
global myDev
myDev.protocol = self
myDev.dataReceived(txt)
def write(self, txt):
try:
newLine = self.lines[-1] + " => " + txt
del self.lines[-1]
self.addLine(newLine)
except:
pass
class MYDEV(MYBASE):
def __init__(self):
MYBASE.__init__(self)
print MYDEV.__name__, "ctor"
def device_display():
global screen, myDev, myOpts, myPort, myFactory
try:
myDev.doIteration();
except:
raise
if not myOpts["window"]:
return
try:
rows, cols = screen.stdscr.getmaxyx()
screen.stdscr.addstr(0, 0, "Lnks:%2d" % myFactory.numProtocols)
screen.stdscr.addstr(0, 10, "Rnd:%6.3f" % myDev.RANDOM)
screen.stdscr.addstr(0, 22, "Identity : %s (%d)" % (myDev.IDN, myPort))
screen.stdscr.addstr(1, 0, "Pressure :")
screen.stdscr.addstr(1, 12, "%11.4E" % myDev.pressure)
except:
pass
finally:
try:
screen.stdscr.refresh()
except:
pass
if __name__ == "__main__":
global screen, myDev, myOpts, myPort, myFactory
myOpts = MyOptions()
try:
myOpts.parseOptions()
except usage.UsageError, errortext:
print '%s: %s' % (sys.argv[0], errortext)
print '%s: Try --help for usage details.' % (sys.argv[0])
raise SystemExit, 1
myDev = MYDEV()
default_port = 4261
myPort = default_port
logfile = None
if myOpts["port"]:
myPort = int(myOpts["port"])
if myPort < 1025 or myPort > 65535:
myPort = default_port
if myOpts["window"]:
logfile = "/tmp/Fake_Pfeiffer_%d.log" % (myPort)
if myOpts["logfile"]:
logfile = myOpts["logfile"]
if logfile:
log.startLogging(open(logfile, "w"))
else:
log.startLogging(sys.stdout)
#log.startLogging(sys.stderr)
if myOpts["window"]:
import curses
stdscr = curses.initscr()
screen = MyScreen(stdscr)
# add screen object as a reader to the reactor
reactor.addReader(screen)
myFactory = PfeifferFactory(PfeifferProtocol, myDev, "\r\n")
lc = LoopingCall(device_display)
lc.start(0.250)
reactor.listenTCP(myPort, myFactory) # server
reactor.run()