256 lines
8.2 KiB
Python
256 lines
8.2 KiB
Python
###################################################################################################
|
|
# Deployment specific global definitions - executed after startup.py
|
|
###################################################################################################
|
|
|
|
#Uncomment this line to create the simulated devices needed to the tutorial scripts.
|
|
#run("tutorial/devices")
|
|
|
|
|
|
# The code below is necessary to run the Testing List plug-in:
|
|
global print_log, sendFeedback, sys, inspect, os, traceback
|
|
import sys, inspect, os, traceback
|
|
|
|
class TestingTool:
|
|
"""
|
|
Common tools for running the test
|
|
"""
|
|
def __init__(self, testName, testPath, deviceName, testParams):
|
|
"""
|
|
Init class with test data
|
|
"""
|
|
self.deviceName = deviceName
|
|
self.testName = testName
|
|
self.testPath = testPath
|
|
self.testParams = testParams
|
|
print('\n')
|
|
self.log('='*80)
|
|
|
|
def getParam(self, paramName):
|
|
"""
|
|
get specific test parameter
|
|
paramName = the name of the parameter for which the value must be read
|
|
"""
|
|
try:
|
|
return self.testParams[paramName]["value"]
|
|
except:
|
|
self.log('Could not retrieve testing parameter ' + paramName + ' - Details: ' + traceback.format_exc())
|
|
return None
|
|
|
|
|
|
def getName(self):
|
|
"""
|
|
get test name
|
|
"""
|
|
return self.testName
|
|
|
|
|
|
def getPlotName(self):
|
|
"""
|
|
get test plot name
|
|
"""
|
|
return self.deviceName + ' - ' + self.testName
|
|
|
|
|
|
def getDeviceName(self):
|
|
"""
|
|
get device name
|
|
"""
|
|
return self.deviceName
|
|
|
|
|
|
def getPath(self):
|
|
"""
|
|
get test path
|
|
"""
|
|
return self.testPath
|
|
|
|
|
|
def log(self, text, severity=0):
|
|
"""
|
|
Print/log information
|
|
text = the string to be printed/logged
|
|
severity = 0:Info, 1:Warning, 2:Error
|
|
"""
|
|
import time
|
|
time.ctime()
|
|
now = time.strftime('%Y.%m.%d %H:%M:%S')
|
|
sev = ['INF','WAR','ERR']
|
|
logText = sev[severity] + ' - ' + self.deviceName + ' ' + self.testName + ': ' + str(text)
|
|
print now + ' ' + logText
|
|
try:
|
|
log(logText)
|
|
except:
|
|
#cannot write log. maybe busy? Wait and retry
|
|
time.sleep(1)
|
|
try:
|
|
log(logText)
|
|
except:
|
|
#cannot write log
|
|
print now + ' ' + sev[2] + ' - ' + self.deviceName + ' ' + self.testName + ': ' + 'cannot write Log in Data'
|
|
|
|
|
|
def printParams(self):
|
|
"""
|
|
Print/log the pshell parameters
|
|
params = the JSON object with the parameters passed by pshell
|
|
"""
|
|
columnsFormat = '{0:17} {1:>20} {2}'
|
|
self.log('-'*80)
|
|
self.log(columnsFormat.format("Parameter", "Value", "Description"))
|
|
self.log('-'*80)
|
|
for key in self.testParams.keys():
|
|
self.log(columnsFormat.format(key,
|
|
self.testParams[key]['value'],
|
|
self.testParams[key]['description']))
|
|
self.log('-'*80)
|
|
|
|
|
|
def sendFeedback(self, returnString, testPassed):
|
|
"""
|
|
Prepare and send feedback to calling tool
|
|
returnString = the string containing info on the test result
|
|
testPassed = true if the test was successful, false if test had a problem
|
|
"""
|
|
self.log('End of test. Result:')
|
|
#self.log('Device: ' + self.deviceName)
|
|
#self.log('Test name: ' + self.testName)
|
|
#self.log('Test path: ' + self.testPath)
|
|
severity = 0
|
|
if testPassed:
|
|
self.log('**** TEST PASSED ****',severity)
|
|
else:
|
|
severity = 2
|
|
self.log('**** TEST FAILED ****',severity)
|
|
if returnString != '':
|
|
self.log(returnString,severity)
|
|
ret = [self.testPath, self.deviceName, returnString, testPassed]
|
|
set_return( ret)
|
|
|
|
|
|
|
|
# RPS-Test Classes
|
|
class Interlock:
|
|
"""
|
|
class for getting - setting interlock mode
|
|
"""
|
|
from array import *
|
|
# mode, test pattern for outputs 2-4, test pattern for output 5, expected pattern for 2-4, expected pattern for 5
|
|
rps_mask1 = 0x4440 # mask for outputs 2-4
|
|
rps_mask2 = 0x0004 # mask for output 5
|
|
msg_default = bytearray()
|
|
|
|
def __init__(self):
|
|
"""
|
|
Message composition at position:
|
|
0: 0x32 = 50 = byte count
|
|
2: 0x69 = 105 = transfer function
|
|
4,5: 0x99,0x88 = this applications port number (swapped)
|
|
7: 1 = message id
|
|
16: io-func: reset=2; read mode=0; write mode=1
|
|
"""
|
|
self.msg_default = array('B',[0x32,0,0x69,0,0x99,0x88,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0])
|
|
|
|
def _msgRead(self):
|
|
self.msg_default[16] = 0
|
|
return self.msg_default
|
|
|
|
def _msgWrite(self):
|
|
self.msg_default[16] = 1
|
|
return self.msg_default
|
|
|
|
def _msgReset(self):
|
|
self.msg_default[16] = 2
|
|
return self.msg_default
|
|
|
|
def setInterlockMode(self, mode):
|
|
import binascii
|
|
udp = UDPDatagram()
|
|
msg = self._msgWrite()
|
|
msg.extend(array('B', mode))
|
|
print 'setInterlockMode', binascii.hexlify(msg)
|
|
if not udp.send(msg):
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def getInterlockMode(self):
|
|
import binascii
|
|
udp = UDPDatagram()
|
|
msg = self._msgRead()
|
|
print 'getInterlockMode', binascii.hexlify(msg)
|
|
rcv = udp.send(msg)
|
|
#print 'getInterlockMode', binascii.hexlify(rcv)
|
|
return rcv
|
|
|
|
def masterReset(self):
|
|
import binascii
|
|
udp = UDPDatagram()
|
|
msg = self._msgReset()
|
|
print 'masterReset', binascii.hexlify(msg)
|
|
if not udp.send(msg):
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
class UDPDatagram:
|
|
"""
|
|
class for receiving and sending a udp message
|
|
"""
|
|
import socket
|
|
|
|
def __init__(self):
|
|
# communication with interlock server
|
|
self.serverPort = 0xBB1B # interlock server
|
|
self.serverName = 'PROMC1' # ball server
|
|
try:
|
|
print 'here the socket should be open'
|
|
#self.sock = socket.socket(socket.AF_INET, # Internet
|
|
# socket.SOCK_DGRAM) # UDP
|
|
#except socket.error, msg:
|
|
except :
|
|
#print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
|
|
sys.exit()
|
|
|
|
def _sendUDP(self, message):
|
|
import binascii
|
|
try:
|
|
# get the ip address and send
|
|
print 'here should sendto:', self.serverName, self.serverPort
|
|
print 'message: ', binascii.hexlify(message)
|
|
#self.sock.sendto(message, (self.serverName, self.serverPort))
|
|
#except socket.error, msg:
|
|
except :
|
|
#print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
|
|
sys.exit()
|
|
|
|
def _listenInit(self):
|
|
try:
|
|
'here should bind to socket'
|
|
#self.sock.bind('', self.ThisPort)
|
|
#except socket.error, msg:
|
|
except:
|
|
#print 'Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
|
|
sys.exit()
|
|
print 'Socket bind complete. '
|
|
|
|
def _listen(self):
|
|
# now keep talking with the client
|
|
print 'here should receive data'
|
|
#while 1:
|
|
while 0:
|
|
# receive data from client (data, addr)
|
|
#data, addr = self.sock.recvfrom(1024)
|
|
if not data:
|
|
print 'End of data or no data'
|
|
break
|
|
print 'Message[' + addr[0] + ':' + str(addr[1]) + '] - ' + data.strip()
|
|
|
|
#self.sock.close()
|
|
#return data[28:]
|
|
return [9]*50
|
|
|
|
def send(self, message):
|
|
self._listenInit()
|
|
self._sendUDP(message)
|
|
return self._listen()
|
|
|