Files
sics/site_ansto/instrument/TEST_SICS/fakeGalil/SIM_GALIL_taipan.py

155 lines
4.6 KiB
Python
Executable File

#!/usr//bin/env python
# vim: ts=8 sts=4 sw=4 expandtab
# Author: Douglas Clowes (dcl@ansto.gov.au) 2012-06-29
from twisted.internet import reactor
from twisted.python import log
from twisted.internet.task import LoopingCall
import random
import re
import os
import sys
import time
import inspect
from galilfactory import GalilFactory
from counterclient import CounterClientFactory
from powdersample import CubicPowderSample
import curses
sys.path.insert(0, os.path.realpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),"../../util"))))
from displayscreen import Screen
def stopCallbackM1M2(motor):
global nickel
global beam_monitor_rate
mu = m2.getPosition() / 2.0
nickel.set_wavelength_from_theta_pg002(mu)
x = m1.getPosition()
y = nickel.funx(abs(x), abs(mu), 0.2)
baseline = 1000
height = int(5000 * y) + baseline
beam_monitor_rate = height
print "My Height at", x, "is", height
if beam_monitor_counter:
beam_monitor_counter.setCountRate(height)
if detector_counter:
stopCallbackS2(m1)
print "Motor M1, M2 at:", m1.getPosition(), m2.getPosition(), ", wavelength=", nickel.wavelength, ", mEv=", nickel.mEv
def stopCallbackS2(motor):
global nickel
global beam_monitor_rate
if beam_monitor_rate == None:
m1.stopCallback(m1)
x = s2.getPosition()
y = nickel.calc(abs(x))
baseline = 2000
height = int(beam_monitor_rate * y) + baseline
print "My Height at", x, "is", height
if detector_counter:
detector_counter.setCountRate(height)
print "Motor S2 at:", s2.getPosition()
def device_iterator():
now = time.time()
m1.doIteration(now)
m2.doIteration(now)
s2.doIteration(now)
def display_iterator():
global screen, devices
try:
rows, cols = screen.stdscr.getmaxyx()
col = 0
for idx in sorted(devices.keys()):
line = 0;
col += 1
screen.stdscr.addstr(line, 13 * (col - 1), idx)
for mtr in sorted(devices[idx].motors.keys()):
line += 1
screen.stdscr.addstr(line, 13 * (col - 1), "%1s:%8.3f" % (mtr, devices[idx].motors[mtr].getPosition()))
if (devices[idx].motors[mtr].motorState == "OFF"):
screen.stdscr.addstr(line, 13 * (col - 1) + 10, " ")
else:
screen.stdscr.addstr(line, 13 * (col - 1) + 10, "*")
except:
raise
finally:
try:
screen.stdscr.refresh()
except:
pass
class MyScreen(Screen):
def __init__(self, stdscr):
Screen.__init__(self, stdscr)
def sendLine(self, txt):
global devices
dev, cmd = line.split(":")
print "Dev:", dev, "Cmd:", cmd
if dev in devices:
myDev = devices[dev]
myDev.protocol = self
myDev.dataReceived(cmd)
def write(self, txt):
try:
newLine = self.lines[-1] + " => " + txt
del self.lines[-1]
self.addLine(newLine)
except:
pass
if __name__ == "__main__":
global screen, devices
basePort = 62034 # Echidna
basePort = 62134 # Wombat
basePort = 62430 # Quokka
basePort = 62730 # Taipan
#log.startLogging(sys.stdout)
log.startLogging(open(("/tmp/Fake_Galil_%d.log" % basePort), "w"))
beam_monitor_counter = None
detector_counter = None
beam_monitor_rate = None
nickel = CubicPowderSample()
devices = {}
for dev in range(0, 6):
port = basePort + dev
controllerName = "mc%d" % (dev + 1)
factory = GalilFactory(port)
devices[controllerName] = factory.device
reactor.listenTCP(port, factory)
s2 = devices["mc2"].motors["F"]
s2.moveCallback = s2.stopCallback = stopCallbackS2
m1 = devices["mc3"].motors["E"]
m1.moveCallback = m1.stopCallback = stopCallbackM1M2
m2 = devices["mc1"].motors["F"]
m2.moveCallback = m2.stopCallback = stopCallbackM1M2
m1.stopCallback(m1)
m2.stopCallback(m2)
s2.stopCallback(s2)
beam_monitor_counter = CounterClientFactory()
detector_counter = CounterClientFactory()
connector1 = reactor.connectTCP("localhost", 33001, beam_monitor_counter)
connector2 = reactor.connectTCP("localhost", 33000, detector_counter)
print "Connector1:", connector1.getDestination()
print "Connector2:", connector2.getDestination()
stdscr = curses.initscr()
screen = MyScreen(stdscr)
reactor.addReader(screen)
lc = LoopingCall(device_iterator)
lc.start(0.05)
dc = LoopingCall(display_iterator)
dc.start(0.5)
reactor.run()