Files
ncs/script/local.py
boccioli_m ece149f2f5 Closedown
2017-10-10 13:36:58 +02:00

216 lines
6.7 KiB
Python

###################################################################################################
# Deployment specific global definitions - executed after startup.py
###################################################################################################
#Uncomment this line to create the simulated devices needed to the tutorial scripts.
#run("tutorial/devices")
# The code below is necessary to run the Testing List plug-in:
global print_log, sendFeedback, sys, inspect, os, traceback
import sys, inspect, os, traceback
class TestingTool:
"""
Common tools for running the test
"""
def __init__(self, testName, testPath, deviceName, testParams):
"""
Init class with test data
"""
self.deviceName = deviceName
self.testName = testName
self.testPath = testPath
self.testParams = testParams
def getParam(self, paramName):
"""
get specific test parameter
paramName = the name of the parameter for which the value must be read
"""
try:
return self.testParams[paramName]["value"]
except:
self.log('Could not retrieve testing parameter ' + paramName + ' - Details: ' + traceback.format_exc())
return None
def getName(self):
"""
get test name
"""
return self.testName
def getPlotName(self):
"""
get test plot name
"""
return self.deviceName + ' - ' + self.testName
def getDeviceName(self):
"""
get device name
"""
return self.deviceName
def getPath(self):
"""
get test path
"""
return self.testPath
def log(self, text):
"""
Print/log information
text = the string to be printed/logged
"""
import time
time.ctime()
now = time.strftime('%Y.%m.%d %H:%M:%S')
logText = self.deviceName + ' - ' + self.testName + ': ' + str(text)
print now + ' ' + logText
try:
log(logText)
except:
#cannot write log. maybe busy? Wait and retry
time.sleep(1)
try:
log(logText)
except:
#cannot write log
print now + ' ' + self.deviceName + ' - ' + self.testName + ': ' + 'cannot write Log in Data'
def printParams(self):
"""
Print/log the pshell parameters
params = the JSON object with the parameters passed by pshell
"""
columnsFormat = '{0:20} {1:>7} {2}'
self.log(columnsFormat.format("Parameter", "Value", "Description"))
for key in self.testParams.keys():
self.log(columnsFormat.format(key,
self.testParams[key]['value'],
self.testParams[key]['description']))
def sendFeedback(self, returnString, testPassed):
"""
Prepare and send feedback to calling tool
returnString = the string containing info on the test result
testPassed = true if the test was successful, false if test had a problem
"""
self.log('End of test. Result:')
#self.log('Device: ' + self.deviceName)
#self.log('Test name: ' + self.testName)
#self.log('Test path: ' + self.testPath)
if testPassed:
self.log('**** TEST PASSED ****')
else:
self.log('**** TEST FAILED ****')
self.log('Return message: ' + returnString)
ret = [self.testPath, self.deviceName, returnString, testPassed]
set_return( ret)
class Interlock:
# data acquisition definitions
IO_READ = 1
sIO_WRITE = 2
IO_DIGITAL = 3
# mode, test pattern for outputs 2-4, test pattern for output 5, expected pattern for 2-4, expected pattern for 5
rps_beamline = 0
rps_mode = 1
rps_expected1 = 2
rps_expected2 = 3
rps_mask1 = 0x4440 # mask for outputs 2-4
rps_mask2 = 0x0004 # mask for output 5
def __init__(self):
self.log('ecco pio')
def log(self, text):
print str(text)
def setInterlockMode(self, mode)
i = 0
d = UDPDatagram()
msg = byte[128]
msg[0] = 50 # bytecount
msg[2] = 105 # transfer function
msg[4] = (byte) PORT1
msg[5] = (byte) (PORT1 >>> 8)
msg[6] = 1 # message id
msg[16] = 1 # io-func reset=2 read mode=0 write mode=1
buffer = mode.getBytes()
while i < buffer.length:
msg[28 + i] = buffer[i]
i = i+1
d.listenInit()
d.send(Server, msg)
mess = d.listen()
if mess == null:
return false
else:
return true
# ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** **
# class for receiving and sending a udp message
# ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** **
class UDPDatagram:
import socket
# the constructor
def __init__(self):
# communication with interlock server
self.serverPort = 0xBB1B # interlock server
self.thisPort = 0x8899 # this port
self.serverName = 'PROMC1' # ball server
try:
self.sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
except socket.error, msg:
print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
sys.exit()
# sending
def _sendUDP(self, recipient, message):
try:
# get the ip address and send
ipAddress = recipient
self.sock.sendto(message, (ipAddress, PORT))
except socket.error, msg:
print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
sys.exit()
def _listenInit(self):
try:
self.sock.bind('', self.ThisPort)
except socket.error, msg:
print 'Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
sys.exit()
print 'Socket bind complete. '
def _listen(self):
# now keep talking with the client
while 1:
# receive data from client (data, addr)
data, addr = self.sock.recvfrom(1024)
if not data:
print 'End of data or no data'
break
print 'Message[' + addr[0] + ':' + str(addr[1]) + '] - ' + data.strip()
self.sock.close()
return data[28:]